nordabiz/blueprints/api/routes_oauth.py
Maciej Pienczyn 592ceff30d
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: sync Facebook OAuth stats to company social media profiles
After connecting a FB page via OAuth, automatically fetch page stats
(followers, engagement, bio) from Graph API and persist to
CompanySocialMedia table. Adds manual refresh endpoint and UI badge.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 13:54:57 +01:00

443 lines
16 KiB
Python

"""
OAuth API Routes
================
Endpoints for connecting external accounts (Google, Meta) via OAuth 2.0.
"""
import logging
import secrets
from flask import jsonify, request, redirect, session
from flask_login import login_required, current_user
from . import bp
from database import SessionLocal, OAuthToken
logger = logging.getLogger(__name__)
@bp.route('/oauth/connect/<provider>/<service>', methods=['POST'])
@login_required
def oauth_connect(provider, service):
"""Initiate OAuth flow for connecting an external account.
POST /api/oauth/connect/google/gbp
POST /api/oauth/connect/meta/facebook
"""
from oauth_service import OAuthService
# Validate provider/service
valid_combinations = {
('google', 'gbp'), ('google', 'search_console'),
('meta', 'facebook'), ('meta', 'instagram'),
}
if (provider, service) not in valid_combinations:
return jsonify({'success': False, 'error': f'Nieznany provider/service: {provider}/{service}'}), 400
# User must have a company
if not current_user.company_id:
return jsonify({'success': False, 'error': 'Musisz być przypisany do firmy'}), 403
# Generate CSRF state token
state = f"{current_user.company_id}:{current_user.id}:{service}:{secrets.token_urlsafe(16)}"
session['oauth_state'] = state
oauth = OAuthService()
auth_url = oauth.get_authorization_url(provider, service, state)
if not auth_url:
return jsonify({
'success': False,
'error': f'OAuth nie skonfigurowany dla {provider}. Skontaktuj się z administratorem.'
}), 503
return jsonify({'success': True, 'auth_url': auth_url})
@bp.route('/oauth/callback/<provider>', methods=['GET'])
@login_required
def oauth_callback(provider):
"""Handle OAuth callback from provider.
GET /api/oauth/callback/google?code=...&state=...
GET /api/oauth/callback/meta?code=...&state=...
"""
from oauth_service import OAuthService
code = request.args.get('code')
state = request.args.get('state')
error = request.args.get('error')
# Parse state early to get company_id for redirects
# State format: company_id:user_id:service:random
state_company_id = None
if state:
try:
state_company_id = int(state.split(':')[0])
except (ValueError, IndexError):
pass
def settings_redirect(params):
"""Redirect to user's integrations page with query params."""
return redirect(f'/konto/integracje?{params}')
if error:
logger.warning(f"OAuth error from {provider}: {error}")
return settings_redirect(f'oauth_error={error}')
if not code or not state:
return settings_redirect('oauth_error=missing_params')
# Validate state
saved_state = session.pop('oauth_state', None)
if not saved_state or saved_state != state:
logger.warning(f"OAuth state mismatch for {provider}")
return settings_redirect('oauth_error=invalid_state')
# Parse state: company_id:user_id:service:random
try:
parts = state.split(':')
company_id = int(parts[0])
user_id = int(parts[1])
service = parts[2]
except (ValueError, IndexError):
return settings_redirect('oauth_error=invalid_state_format')
# Verify user owns this company
if current_user.id != user_id or current_user.company_id != company_id:
return redirect(f'/konto/integracje?oauth_error=unauthorized')
# Exchange code for token
oauth = OAuthService()
token_data = oauth.exchange_code(provider, code)
if not token_data:
return redirect(f'/konto/integracje?oauth_error=token_exchange_failed')
# Save token
db = SessionLocal()
try:
success = oauth.save_token(db, company_id, user_id, provider, service, token_data)
if success:
logger.info(f"OAuth connected: {provider}/{service} for company {company_id} by user {user_id}")
return redirect(f'/konto/integracje?oauth_success={provider}/{service}')
else:
return redirect(f'/konto/integracje?oauth_error=save_failed')
finally:
db.close()
@bp.route('/oauth/status', methods=['GET'])
@login_required
def oauth_status():
"""Get connected OAuth services for current user's company.
GET /api/oauth/status
"""
from oauth_service import OAuthService
if not current_user.company_id:
return jsonify({'success': True, 'services': {}})
db = SessionLocal()
try:
oauth = OAuthService()
services = oauth.get_connected_services(db, current_user.company_id)
# Add available (but not connected) services
all_services = {
'google/gbp': {'name': 'Google Business Profile', 'description': 'Pełne dane o wizytówce, opinie, insights'},
'google/search_console': {'name': 'Google Search Console', 'description': 'Zapytania, CTR, pozycje w wyszukiwaniu'},
'meta/facebook': {'name': 'Facebook', 'description': 'Reach, impressions, demographics, post insights'},
'meta/instagram': {'name': 'Instagram', 'description': 'Stories, reels, engagement metrics'},
}
result = {}
for key, info in all_services.items():
connected = services.get(key, {})
result[key] = {
**info,
'connected': bool(connected),
'account_name': connected.get('account_name'),
'expires_at': connected.get('expires_at'),
'is_expired': connected.get('is_expired', False),
}
return jsonify({'success': True, 'services': result})
finally:
db.close()
@bp.route('/oauth/disconnect/<provider>/<service>', methods=['POST'])
@login_required
def oauth_disconnect(provider, service):
"""Disconnect an OAuth service.
POST /api/oauth/disconnect/google/gbp
"""
if not current_user.company_id:
return jsonify({'success': False, 'error': 'Brak przypisanej firmy'}), 403
db = SessionLocal()
try:
token = db.query(OAuthToken).filter(
OAuthToken.company_id == current_user.company_id,
OAuthToken.provider == provider,
OAuthToken.service == service,
).first()
if token:
token.is_active = False
db.commit()
logger.info(f"OAuth disconnected: {provider}/{service} for company {current_user.company_id}")
return jsonify({'success': True, 'message': f'{provider}/{service} rozłączony'})
else:
return jsonify({'success': False, 'error': 'Nie znaleziono połączenia'}), 404
finally:
db.close()
@bp.route('/oauth/google/discover-locations', methods=['POST'])
@login_required
def oauth_discover_gbp_locations():
"""Auto-discover GBP locations after OAuth connection."""
if not current_user.company_id:
return jsonify({'success': False, 'error': 'Brak firmy'}), 403
from oauth_service import OAuthService
oauth = OAuthService()
db = SessionLocal()
try:
token = oauth.get_valid_token(db, current_user.company_id, 'google', 'gbp')
if not token:
return jsonify({'success': False, 'error': 'Brak połączenia GBP'}), 404
try:
from gbp_management_service import GBPManagementService
gbp = GBPManagementService(token)
accounts = gbp.list_accounts()
locations = []
for acc in accounts:
acc_locations = gbp.list_locations(acc.get('name', ''))
locations.extend(acc_locations)
return jsonify({'success': True, 'locations': locations})
except ImportError:
return jsonify({'success': False, 'error': 'Serwis GBP Management niedostępny'}), 503
except Exception as e:
logger.error(f"GBP discover locations error: {e}")
return jsonify({'success': False, 'error': 'Błąd podczas wyszukiwania lokalizacji'}), 500
finally:
db.close()
@bp.route('/oauth/meta/discover-pages', methods=['POST'])
@login_required
def oauth_discover_fb_pages():
"""Discover Facebook pages managed by the user after OAuth connection.
POST /api/oauth/meta/discover-pages
Body: {"company_id": 123}
"""
company_id = request.json.get('company_id') if request.is_json else request.form.get('company_id', type=int)
if not company_id:
return jsonify({'success': False, 'error': 'company_id jest wymagany'}), 400
from oauth_service import OAuthService
oauth = OAuthService()
db = SessionLocal()
try:
# Need user token (not page token) to list managed pages via me/accounts
from database import OAuthToken
oauth_token = db.query(OAuthToken).filter(
OAuthToken.company_id == company_id,
OAuthToken.provider == 'meta',
OAuthToken.service == 'facebook',
OAuthToken.is_active == True,
).first()
if not oauth_token:
return jsonify({'success': False, 'error': 'Brak połączenia Facebook dla tej firmy'}), 404
# Use user token from metadata if available (page token can't list pages)
meta = oauth_token.metadata_json or {}
user_token = meta.get('user_access_token') or oauth_token.access_token
from facebook_graph_service import FacebookGraphService
fb = FacebookGraphService(user_token)
pages = fb.get_managed_pages()
return jsonify({
'success': True,
'pages': [
{
'id': p.get('id'),
'name': p.get('name'),
'category': p.get('category'),
'fan_count': p.get('fan_count', 0),
}
for p in pages
]
})
except Exception as e:
logger.error(f"FB discover pages error: {e}")
return jsonify({'success': False, 'error': 'Błąd podczas wyszukiwania stron Facebook'}), 500
finally:
db.close()
@bp.route('/oauth/meta/select-page', methods=['POST'])
@login_required
def oauth_select_fb_page():
"""Select a Facebook page and save its page access token.
POST /api/oauth/meta/select-page
Body: {"company_id": 123, "page_id": "456789"}
"""
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': 'JSON body wymagany'}), 400
company_id = data.get('company_id')
page_id = data.get('page_id')
if not company_id or not page_id:
return jsonify({'success': False, 'error': 'company_id i page_id są wymagane'}), 400
from oauth_service import OAuthService
oauth = OAuthService()
db = SessionLocal()
try:
# Need user token (not page token) to list pages with their page access tokens
from database import OAuthToken
oauth_token = db.query(OAuthToken).filter(
OAuthToken.company_id == company_id,
OAuthToken.provider == 'meta',
OAuthToken.service == 'facebook',
OAuthToken.is_active == True,
).first()
if not oauth_token:
return jsonify({'success': False, 'error': 'Brak połączenia Facebook'}), 404
meta = oauth_token.metadata_json or {}
user_token = meta.get('user_access_token') or oauth_token.access_token
from facebook_graph_service import FacebookGraphService
fb = FacebookGraphService(user_token)
pages = fb.get_managed_pages()
# Find selected page
selected = None
for p in pages:
if str(p.get('id')) == str(page_id):
selected = p
break
if not selected:
return jsonify({'success': False, 'error': 'Nie znaleziono wybranej strony'}), 404
page_access_token = selected.get('access_token')
page_name = selected.get('name', '')
if not page_access_token:
return jsonify({'success': False, 'error': 'Brak tokenu strony. Sprawdź uprawnienia.'}), 400
# Update oauth_tokens with page-specific data
oauth_token = db.query(OAuthToken).filter(
OAuthToken.company_id == company_id,
OAuthToken.provider == 'meta',
OAuthToken.service == 'facebook',
OAuthToken.is_active == True,
).first()
if oauth_token:
# Preserve user token in metadata for future page discovery
meta = oauth_token.metadata_json or {}
if not meta.get('user_access_token'):
meta['user_access_token'] = oauth_token.access_token
oauth_token.metadata_json = meta
oauth_token.access_token = page_access_token
oauth_token.account_id = str(page_id)
oauth_token.account_name = page_name
# Create/update social_media_config for this company
from database import SocialMediaConfig
config = db.query(SocialMediaConfig).filter(
SocialMediaConfig.platform == 'facebook',
SocialMediaConfig.company_id == company_id,
).first()
if not config:
config = SocialMediaConfig(platform='facebook', company_id=company_id)
db.add(config)
config.page_id = str(page_id)
config.page_name = page_name
config.is_active = True
config.debug_mode = True
config.updated_by = current_user.id
db.commit()
logger.info(f"FB page selected: {page_name} (ID: {page_id}) for company {company_id}")
# Sync Facebook stats to CompanySocialMedia (non-blocking)
sync_data = None
try:
from facebook_graph_service import sync_facebook_to_social_media
sync_result = sync_facebook_to_social_media(db, company_id)
logger.info(f"FB sync after page select: {sync_result}")
if sync_result.get('success'):
sync_data = sync_result.get('data')
except Exception as e:
logger.error(f"FB sync error (non-blocking): {e}")
return jsonify({
'success': True,
'message': f'Strona "{page_name}" połączona',
'page': {
'id': page_id,
'name': page_name,
'category': selected.get('category'),
},
'sync': sync_data,
})
except Exception as e:
db.rollback()
logger.error(f"FB select page error: {e}")
return jsonify({'success': False, 'error': 'Błąd zapisu konfiguracji strony'}), 500
finally:
db.close()
@bp.route('/oauth/meta/sync-facebook', methods=['POST'])
@login_required
def oauth_sync_facebook_data():
"""Manually refresh Facebook page stats for a company.
POST /api/oauth/meta/sync-facebook
Body: {"company_id": 123}
"""
data = request.get_json()
if not data or not data.get('company_id'):
return jsonify({'success': False, 'error': 'company_id jest wymagany'}), 400
company_id = data['company_id']
# Permission check: user must be able to edit this company
if not current_user.can_edit_company(company_id):
return jsonify({'success': False, 'error': 'Brak uprawnień'}), 403
db = SessionLocal()
try:
from facebook_graph_service import sync_facebook_to_social_media
result = sync_facebook_to_social_media(db, company_id)
if result.get('success'):
return jsonify(result)
else:
return jsonify(result), 400
except Exception as e:
logger.error(f"FB manual sync error: {e}")
return jsonify({'success': False, 'error': 'Błąd synchronizacji danych Facebook'}), 500
finally:
db.close()