Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Display last 10 posts from connected Facebook page with engagement data (likes, comments, shares, reactions). On-demand insights button loads impressions, reach, engaged users, and clicks per post. In-memory cache with 5-min TTL prevents API rate limit issues. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
503 lines
19 KiB
Python
503 lines
19 KiB
Python
"""
|
|
Facebook + Instagram Graph API Client
|
|
======================================
|
|
|
|
Uses OAuth 2.0 page tokens to access Facebook Page and Instagram Business data.
|
|
|
|
API docs: https://developers.facebook.com/docs/graph-api/
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FacebookGraphService:
|
|
"""Facebook + Instagram Graph API client."""
|
|
|
|
BASE_URL = "https://graph.facebook.com/v21.0"
|
|
|
|
def __init__(self, access_token: str):
|
|
self.access_token = access_token
|
|
self.session = requests.Session()
|
|
self.session.timeout = 15
|
|
|
|
def _get(self, endpoint: str, params: dict = None) -> Optional[Dict]:
|
|
"""Make authenticated GET request."""
|
|
params = params or {}
|
|
params['access_token'] = self.access_token
|
|
try:
|
|
resp = self.session.get(f"{self.BASE_URL}/{endpoint}", params=params)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except Exception as e:
|
|
logger.error(f"Facebook API {endpoint} failed: {e}")
|
|
return None
|
|
|
|
def get_managed_pages(self) -> List[Dict]:
|
|
"""Get Facebook pages managed by the authenticated user."""
|
|
data = self._get('me/accounts', {'fields': 'id,name,category,access_token,fan_count'})
|
|
return data.get('data', []) if data else []
|
|
|
|
def get_page_info(self, page_id: str) -> Optional[Dict]:
|
|
"""Get detailed page information."""
|
|
return self._get(page_id, {
|
|
'fields': 'id,name,fan_count,category,link,about,website,phone,single_line_address,followers_count'
|
|
})
|
|
|
|
def get_page_insights(self, page_id: str, days: int = 28) -> Dict:
|
|
"""Get page insights (impressions, engaged users, reactions).
|
|
|
|
Note: Requires page access token, not user access token.
|
|
The page token should be stored during OAuth connection.
|
|
"""
|
|
since = datetime.now() - timedelta(days=days)
|
|
until = datetime.now()
|
|
|
|
metrics = 'page_impressions,page_engaged_users,page_fans,page_views_total'
|
|
data = self._get(f'{page_id}/insights', {
|
|
'metric': metrics,
|
|
'period': 'day',
|
|
'since': int(since.timestamp()),
|
|
'until': int(until.timestamp()),
|
|
})
|
|
|
|
if not data:
|
|
return {}
|
|
|
|
result = {}
|
|
for metric in data.get('data', []):
|
|
name = metric.get('name', '')
|
|
values = metric.get('values', [])
|
|
if values:
|
|
total = sum(v.get('value', 0) for v in values if isinstance(v.get('value'), (int, float)))
|
|
result[name] = total
|
|
|
|
return result
|
|
|
|
def get_instagram_account(self, page_id: str) -> Optional[str]:
|
|
"""Get linked Instagram Business account ID from a Facebook Page."""
|
|
data = self._get(page_id, {'fields': 'instagram_business_account'})
|
|
if data and 'instagram_business_account' in data:
|
|
return data['instagram_business_account'].get('id')
|
|
return None
|
|
|
|
def get_ig_media_insights(self, ig_account_id: str, days: int = 28) -> Dict:
|
|
"""Get Instagram account insights.
|
|
|
|
Returns follower_count, media_count, and recent media engagement.
|
|
"""
|
|
result = {}
|
|
|
|
# Basic account info
|
|
account_data = self._get(ig_account_id, {
|
|
'fields': 'followers_count,media_count,username,biography'
|
|
})
|
|
if account_data:
|
|
result['followers_count'] = account_data.get('followers_count', 0)
|
|
result['media_count'] = account_data.get('media_count', 0)
|
|
result['username'] = account_data.get('username', '')
|
|
|
|
# Account insights (reach, impressions)
|
|
since = datetime.now() - timedelta(days=days)
|
|
until = datetime.now()
|
|
insights_data = self._get(f'{ig_account_id}/insights', {
|
|
'metric': 'impressions,reach,follower_count',
|
|
'period': 'day',
|
|
'since': int(since.timestamp()),
|
|
'until': int(until.timestamp()),
|
|
})
|
|
|
|
if insights_data:
|
|
for metric in insights_data.get('data', []):
|
|
name = metric.get('name', '')
|
|
values = metric.get('values', [])
|
|
if values:
|
|
total = sum(v.get('value', 0) for v in values if isinstance(v.get('value'), (int, float)))
|
|
result[f'ig_{name}_total'] = total
|
|
|
|
return result
|
|
|
|
# ============================================================
|
|
# PUBLISHING METHODS (Social Publisher)
|
|
# ============================================================
|
|
|
|
def _post(self, endpoint: str, data: dict = None, files: dict = None) -> Optional[Dict]:
|
|
"""Make authenticated POST request."""
|
|
data = data or {}
|
|
params = {'access_token': self.access_token}
|
|
try:
|
|
if files:
|
|
resp = self.session.post(f"{self.BASE_URL}/{endpoint}", params=params, data=data, files=files)
|
|
else:
|
|
resp = self.session.post(f"{self.BASE_URL}/{endpoint}", params=params, data=data)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
error_data = None
|
|
try:
|
|
error_data = e.response.json()
|
|
except Exception:
|
|
pass
|
|
logger.error(f"Facebook API POST {endpoint} failed: {e}, response: {error_data}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Facebook API POST {endpoint} failed: {e}")
|
|
return None
|
|
|
|
def _delete(self, endpoint: str) -> Optional[Dict]:
|
|
"""Make authenticated DELETE request."""
|
|
params = {'access_token': self.access_token}
|
|
try:
|
|
resp = self.session.delete(f"{self.BASE_URL}/{endpoint}", params=params)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except Exception as e:
|
|
logger.error(f"Facebook API DELETE {endpoint} failed: {e}")
|
|
return None
|
|
|
|
def upload_photo_unpublished(self, page_id: str, image_path: str) -> Optional[str]:
|
|
"""Upload photo as unpublished to get photo_id for feed attachment.
|
|
|
|
Two-step process:
|
|
1. Upload photo with published=false -> get photo_id
|
|
2. Use photo_id in create_post() attached_media
|
|
|
|
Args:
|
|
page_id: Facebook Page ID
|
|
image_path: Local path to image file
|
|
|
|
Returns:
|
|
Photo ID (media_fbid) or None on failure
|
|
"""
|
|
try:
|
|
with open(image_path, 'rb') as f:
|
|
files = {'source': (image_path.split('/')[-1], f, 'image/png')}
|
|
result = self._post(f'{page_id}/photos', data={'published': 'false'}, files=files)
|
|
if result and 'id' in result:
|
|
logger.info(f"Uploaded unpublished photo: {result['id']}")
|
|
return result['id']
|
|
logger.error(f"Photo upload failed: {result}")
|
|
return None
|
|
except FileNotFoundError:
|
|
logger.error(f"Image file not found: {image_path}")
|
|
return None
|
|
|
|
def create_post(self, page_id: str, message: str, image_path: str = None,
|
|
published: bool = True, scheduled_time: int = None) -> Optional[Dict]:
|
|
"""Create a post on Facebook Page feed.
|
|
|
|
For posts with images, uses two-step process:
|
|
1. Upload photo as unpublished -> photo_id
|
|
2. Create feed post with attached_media[0]
|
|
|
|
Args:
|
|
page_id: Facebook Page ID
|
|
message: Post text content
|
|
image_path: Optional local path to image
|
|
published: If False, creates draft (visible only to page admins)
|
|
scheduled_time: Unix timestamp for scheduled publishing (min 10 min ahead)
|
|
|
|
Returns:
|
|
API response dict with 'id' key, or None on failure
|
|
"""
|
|
data = {'message': message}
|
|
|
|
# Handle image upload
|
|
if image_path:
|
|
photo_id = self.upload_photo_unpublished(page_id, image_path)
|
|
if photo_id:
|
|
data['attached_media[0]'] = json.dumps({'media_fbid': photo_id})
|
|
else:
|
|
logger.warning("Image upload failed, posting without image")
|
|
|
|
# Handle scheduling
|
|
if scheduled_time:
|
|
data['published'] = 'false'
|
|
data['scheduled_publish_time'] = str(scheduled_time)
|
|
elif not published:
|
|
data['published'] = 'false'
|
|
|
|
result = self._post(f'{page_id}/feed', data=data)
|
|
if result and 'id' in result:
|
|
logger.info(f"Created post: {result['id']} (published={published})")
|
|
return result
|
|
|
|
def publish_draft(self, post_id: str) -> Optional[Dict]:
|
|
"""Publish an unpublished (draft) post.
|
|
|
|
Args:
|
|
post_id: Facebook post ID (format: PAGE_ID_POST_ID)
|
|
|
|
Returns:
|
|
API response or None on failure
|
|
"""
|
|
return self._post(post_id, data={'is_published': 'true'})
|
|
|
|
def unpublish_post(self, post_id: str) -> Optional[Dict]:
|
|
"""Unpublish a public post (make it draft/hidden).
|
|
|
|
Args:
|
|
post_id: Facebook post ID (format: PAGE_ID_POST_ID)
|
|
|
|
Returns:
|
|
API response or None on failure
|
|
"""
|
|
return self._post(post_id, data={'is_published': 'false'})
|
|
|
|
def get_post_engagement(self, post_id: str) -> Optional[Dict]:
|
|
"""Get engagement metrics for a published post.
|
|
|
|
Args:
|
|
post_id: Facebook post ID
|
|
|
|
Returns:
|
|
Dict with likes, comments, shares, reactions_total or None
|
|
"""
|
|
fields = 'id,created_time,message,likes.summary(true),comments.summary(true),shares,reactions.summary(true).limit(0)'
|
|
result = self._get(post_id, {'fields': fields})
|
|
if not result:
|
|
return None
|
|
|
|
return {
|
|
'post_id': result.get('id'),
|
|
'created_time': result.get('created_time'),
|
|
'likes': result.get('likes', {}).get('summary', {}).get('total_count', 0),
|
|
'comments': result.get('comments', {}).get('summary', {}).get('total_count', 0),
|
|
'shares': result.get('shares', {}).get('count', 0) if result.get('shares') else 0,
|
|
'reactions_total': result.get('reactions', {}).get('summary', {}).get('total_count', 0),
|
|
}
|
|
|
|
def get_page_posts(self, page_id: str, limit: int = 10) -> Optional[List[Dict]]:
|
|
"""Get recent posts from a Facebook Page with engagement metrics.
|
|
|
|
Args:
|
|
page_id: Facebook Page ID
|
|
limit: Number of posts to fetch (max 100)
|
|
|
|
Returns:
|
|
List of post dicts or None on failure
|
|
"""
|
|
fields = (
|
|
'id,message,created_time,full_picture,permalink_url,status_type,'
|
|
'likes.summary(true).limit(0),comments.summary(true).limit(0),'
|
|
'shares,reactions.summary(true).limit(0)'
|
|
)
|
|
result = self._get(f'{page_id}/posts', {'fields': fields, 'limit': limit})
|
|
if not result:
|
|
return None
|
|
|
|
posts = []
|
|
for item in result.get('data', []):
|
|
posts.append({
|
|
'id': item.get('id'),
|
|
'message': item.get('message', ''),
|
|
'created_time': item.get('created_time'),
|
|
'full_picture': item.get('full_picture'),
|
|
'permalink_url': item.get('permalink_url'),
|
|
'status_type': item.get('status_type', ''),
|
|
'likes': item.get('likes', {}).get('summary', {}).get('total_count', 0),
|
|
'comments': item.get('comments', {}).get('summary', {}).get('total_count', 0),
|
|
'shares': item.get('shares', {}).get('count', 0) if item.get('shares') else 0,
|
|
'reactions_total': item.get('reactions', {}).get('summary', {}).get('total_count', 0),
|
|
})
|
|
return posts
|
|
|
|
def get_post_insights_metrics(self, post_id: str) -> Optional[Dict]:
|
|
"""Get detailed insights metrics for a specific post.
|
|
|
|
Note: Only available for posts on Pages with 100+ fans.
|
|
|
|
Args:
|
|
post_id: Facebook post ID
|
|
|
|
Returns:
|
|
Dict with impressions, reach, engaged_users, clicks or None
|
|
"""
|
|
metrics = 'post_impressions,post_impressions_unique,post_engaged_users,post_clicks'
|
|
result = self._get(f'{post_id}/insights', {'metric': metrics})
|
|
if not result:
|
|
return None
|
|
|
|
insights = {}
|
|
for metric in result.get('data', []):
|
|
name = metric.get('name', '')
|
|
values = metric.get('values', [])
|
|
if values:
|
|
# Lifetime metrics have a single value
|
|
value = values[0].get('value', 0)
|
|
if name == 'post_impressions':
|
|
insights['impressions'] = value
|
|
elif name == 'post_impressions_unique':
|
|
insights['reach'] = value
|
|
elif name == 'post_engaged_users':
|
|
insights['engaged_users'] = value
|
|
elif name == 'post_clicks':
|
|
insights['clicks'] = value
|
|
|
|
return insights if insights else None
|
|
|
|
def delete_post(self, post_id: str) -> bool:
|
|
"""Delete a post (published or unpublished).
|
|
|
|
Args:
|
|
post_id: Facebook post ID
|
|
|
|
Returns:
|
|
True if deleted successfully
|
|
"""
|
|
result = self._delete(post_id)
|
|
return bool(result and result.get('success'))
|
|
|
|
|
|
# ============================================================
|
|
# SYNC: Facebook Page → CompanySocialMedia
|
|
# ============================================================
|
|
|
|
def sync_facebook_to_social_media(db, company_id: int) -> dict:
|
|
"""Fetch Facebook page stats via Graph API and upsert into CompanySocialMedia.
|
|
|
|
Requires an active OAuth token and SocialMediaConfig for the company.
|
|
Called automatically after page selection and manually via sync endpoint.
|
|
|
|
Args:
|
|
db: SQLAlchemy session
|
|
company_id: Company ID to sync data for
|
|
|
|
Returns:
|
|
dict with 'success' bool and either 'data' or 'error'
|
|
"""
|
|
from oauth_service import OAuthService
|
|
from database import SocialMediaConfig, CompanySocialMedia
|
|
|
|
# 1. Get valid page access token
|
|
oauth = OAuthService()
|
|
token = oauth.get_valid_token(db, company_id, 'meta', 'facebook')
|
|
if not token:
|
|
return {'success': False, 'error': 'no_token', 'message': 'Brak aktywnego tokenu Facebook'}
|
|
|
|
# 2. Get page_id from SocialMediaConfig
|
|
config = db.query(SocialMediaConfig).filter(
|
|
SocialMediaConfig.platform == 'facebook',
|
|
SocialMediaConfig.company_id == company_id,
|
|
SocialMediaConfig.is_active == True,
|
|
).first()
|
|
|
|
if not config or not config.page_id:
|
|
return {'success': False, 'error': 'no_page', 'message': 'Brak skonfigurowanej strony Facebook'}
|
|
|
|
page_id = config.page_id
|
|
|
|
# 3. Fetch page info from Graph API
|
|
fb = FacebookGraphService(token)
|
|
page_info = fb.get_page_info(page_id)
|
|
|
|
if not page_info:
|
|
return {'success': False, 'error': 'api_failed', 'message': 'Nie udało się pobrać danych strony z Facebook API'}
|
|
|
|
# 4. Fetch page insights (best-effort, may be empty)
|
|
insights = fb.get_page_insights(page_id, 28)
|
|
|
|
# 5. Calculate metrics
|
|
followers = page_info.get('followers_count') or page_info.get('fan_count') or 0
|
|
engaged_users = insights.get('page_engaged_users', 0)
|
|
engagement_rate = None
|
|
if followers > 0 and engaged_users > 0:
|
|
engagement_rate = round((engaged_users / followers) * 100, 2)
|
|
|
|
# Profile completeness: 25 points each for about, website, phone, address
|
|
completeness = 0
|
|
if page_info.get('about'):
|
|
completeness += 25
|
|
if page_info.get('website'):
|
|
completeness += 25
|
|
if page_info.get('phone'):
|
|
completeness += 25
|
|
if page_info.get('single_line_address'):
|
|
completeness += 25
|
|
|
|
# URL: prefer API vanity link, then existing URL, then numeric fallback
|
|
# Don't replace a vanity URL with a numeric one (e.g. facebook.com/inpipl → facebook.com/123456)
|
|
import re
|
|
api_link = page_info.get('link')
|
|
if api_link and re.search(r'facebook\.com/\d+$', api_link):
|
|
api_link = None # Numeric URL, not useful as replacement
|
|
|
|
# 6. Upsert CompanySocialMedia record
|
|
# Look for existing Facebook record for this company (any URL)
|
|
existing = db.query(CompanySocialMedia).filter(
|
|
CompanySocialMedia.company_id == company_id,
|
|
CompanySocialMedia.platform == 'facebook',
|
|
).first()
|
|
|
|
now = datetime.now()
|
|
|
|
if existing:
|
|
csm = existing
|
|
# Only update URL if API returned a proper link (not numeric fallback)
|
|
if api_link:
|
|
csm.url = api_link
|
|
# Keep existing URL if API didn't return link
|
|
else:
|
|
page_url = api_link or f"https://facebook.com/{page_id}"
|
|
csm = CompanySocialMedia(
|
|
company_id=company_id,
|
|
platform='facebook',
|
|
url=page_url,
|
|
)
|
|
db.add(csm)
|
|
|
|
csm.source = 'facebook_api'
|
|
csm.is_valid = True
|
|
csm.check_status = 'ok'
|
|
csm.page_name = page_info.get('name', '')
|
|
csm.followers_count = followers if followers > 0 else csm.followers_count
|
|
csm.has_bio = bool(page_info.get('about'))
|
|
csm.profile_description = (page_info.get('about') or '')[:500]
|
|
if engagement_rate is not None:
|
|
csm.engagement_rate = engagement_rate
|
|
csm.profile_completeness_score = completeness
|
|
csm.verified_at = now
|
|
csm.last_checked_at = now
|
|
|
|
# Store extra API fields in content_types JSONB (no migration needed)
|
|
extra = csm.content_types or {}
|
|
if page_info.get('category'):
|
|
extra['category'] = page_info['category']
|
|
if page_info.get('website'):
|
|
extra['website'] = page_info['website']
|
|
if page_info.get('phone'):
|
|
extra['phone'] = page_info['phone']
|
|
if page_info.get('single_line_address'):
|
|
extra['address'] = page_info['single_line_address']
|
|
csm.content_types = extra
|
|
|
|
# Append to followers_history
|
|
history = csm.followers_history or []
|
|
if followers > 0:
|
|
today_str = now.strftime('%Y-%m-%d')
|
|
# Don't duplicate entries for the same day
|
|
if not history or history[-1].get('date') != today_str:
|
|
history.append({'date': today_str, 'count': followers})
|
|
csm.followers_history = history
|
|
|
|
db.commit()
|
|
|
|
logger.info(f"FB sync OK for company {company_id}: {followers} followers, engagement={engagement_rate}")
|
|
|
|
return {
|
|
'success': True,
|
|
'data': {
|
|
'page_name': csm.page_name,
|
|
'followers_count': followers,
|
|
'engagement_rate': engagement_rate,
|
|
'profile_completeness_score': completeness,
|
|
'source': 'facebook_api',
|
|
}
|
|
}
|