Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Previously, the integrations page only read token status from DB without attempting refresh, causing "Token wygasł" after 1h of inactivity despite valid refresh_token. Now get_connected_services() auto-refreshes expired tokens on page load, matching the behavior already present in audit flows. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
355 lines
13 KiB
Python
355 lines
13 KiB
Python
"""
|
|
Shared OAuth 2.0 Service for NordaBiz
|
|
=====================================
|
|
|
|
Supports:
|
|
- Google OAuth 2.0 (GBP Business Profile API, Search Console API)
|
|
- Meta OAuth 2.0 (Facebook Graph API, Instagram Graph API)
|
|
|
|
Config via environment variables:
|
|
- GOOGLE_OAUTH_CLIENT_ID, GOOGLE_OAUTH_CLIENT_SECRET
|
|
- META_APP_ID, META_APP_SECRET
|
|
- OAUTH_REDIRECT_BASE_URL (e.g., https://nordabiznes.pl)
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, Tuple
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# OAuth Provider Configurations
|
|
OAUTH_PROVIDERS = {
|
|
'google': {
|
|
'auth_url': 'https://accounts.google.com/o/oauth2/v2/auth',
|
|
'token_url': 'https://oauth2.googleapis.com/token',
|
|
'scopes': {
|
|
'gbp': 'https://www.googleapis.com/auth/business.manage',
|
|
'search_console': 'https://www.googleapis.com/auth/webmasters.readonly',
|
|
},
|
|
},
|
|
'meta': {
|
|
'auth_url': 'https://www.facebook.com/v21.0/dialog/oauth',
|
|
'token_url': 'https://graph.facebook.com/v21.0/oauth/access_token',
|
|
'scopes': {
|
|
'facebook': 'pages_show_list,pages_read_engagement,read_insights',
|
|
'instagram': 'instagram_basic,instagram_manage_insights,pages_show_list',
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
class OAuthService:
|
|
"""Shared OAuth 2.0 service for external API integrations."""
|
|
|
|
def __init__(self):
|
|
self.google_client_id = os.getenv('GOOGLE_OAUTH_CLIENT_ID')
|
|
self.google_client_secret = os.getenv('GOOGLE_OAUTH_CLIENT_SECRET')
|
|
self.meta_app_id = os.getenv('META_APP_ID')
|
|
self.meta_app_secret = os.getenv('META_APP_SECRET')
|
|
self.redirect_base = os.getenv('OAUTH_REDIRECT_BASE_URL', 'https://nordabiznes.pl')
|
|
|
|
def get_authorization_url(self, provider: str, service: str, state: str) -> Optional[str]:
|
|
"""Generate OAuth authorization URL for a provider/service.
|
|
|
|
Args:
|
|
provider: 'google' or 'meta'
|
|
service: 'gbp', 'search_console', 'facebook', 'instagram'
|
|
state: CSRF state token (include company_id and user_id encoded)
|
|
|
|
Returns:
|
|
Authorization URL or None if provider not configured
|
|
"""
|
|
config = OAUTH_PROVIDERS.get(provider)
|
|
if not config:
|
|
logger.error(f"Unknown OAuth provider: {provider}")
|
|
return None
|
|
|
|
scopes = config['scopes'].get(service, '')
|
|
redirect_uri = f"{self.redirect_base}/api/oauth/callback/{provider}"
|
|
|
|
if provider == 'google':
|
|
if not self.google_client_id:
|
|
logger.warning("Google OAuth not configured (GOOGLE_OAUTH_CLIENT_ID)")
|
|
return None
|
|
params = {
|
|
'client_id': self.google_client_id,
|
|
'redirect_uri': redirect_uri,
|
|
'response_type': 'code',
|
|
'scope': scopes,
|
|
'state': state,
|
|
'access_type': 'offline',
|
|
'prompt': 'consent',
|
|
}
|
|
elif provider == 'meta':
|
|
if not self.meta_app_id:
|
|
logger.warning("Meta OAuth not configured (META_APP_ID)")
|
|
return None
|
|
params = {
|
|
'client_id': self.meta_app_id,
|
|
'redirect_uri': redirect_uri,
|
|
'response_type': 'code',
|
|
'scope': scopes,
|
|
'state': state,
|
|
}
|
|
else:
|
|
return None
|
|
|
|
query = '&'.join(f'{k}={requests.utils.quote(str(v))}' for k, v in params.items())
|
|
return f"{config['auth_url']}?{query}"
|
|
|
|
def exchange_code(self, provider: str, code: str) -> Optional[Dict]:
|
|
"""Exchange authorization code for access token.
|
|
|
|
Args:
|
|
provider: 'google' or 'meta'
|
|
code: Authorization code from callback
|
|
|
|
Returns:
|
|
Dict with access_token, refresh_token, expires_in or None on error
|
|
"""
|
|
config = OAUTH_PROVIDERS.get(provider)
|
|
if not config:
|
|
return None
|
|
|
|
redirect_uri = f"{self.redirect_base}/api/oauth/callback/{provider}"
|
|
|
|
if provider == 'google':
|
|
data = {
|
|
'code': code,
|
|
'client_id': self.google_client_id,
|
|
'client_secret': self.google_client_secret,
|
|
'redirect_uri': redirect_uri,
|
|
'grant_type': 'authorization_code',
|
|
}
|
|
elif provider == 'meta':
|
|
data = {
|
|
'code': code,
|
|
'client_id': self.meta_app_id,
|
|
'client_secret': self.meta_app_secret,
|
|
'redirect_uri': redirect_uri,
|
|
}
|
|
else:
|
|
return None
|
|
|
|
try:
|
|
response = requests.post(config['token_url'], data=data, timeout=15)
|
|
if response.status_code != 200:
|
|
logger.error(f"OAuth token exchange failed for {provider}: {response.status_code} - {response.text}")
|
|
return None
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.error(f"OAuth token exchange error for {provider}: {e}")
|
|
return None
|
|
|
|
def refresh_access_token(self, provider: str, refresh_token: str) -> Optional[Dict]:
|
|
"""Refresh an expired access token.
|
|
|
|
Args:
|
|
provider: 'google' or 'meta'
|
|
refresh_token: The refresh token
|
|
|
|
Returns:
|
|
Dict with new access_token, expires_in or None
|
|
"""
|
|
if provider == 'google':
|
|
data = {
|
|
'client_id': self.google_client_id,
|
|
'client_secret': self.google_client_secret,
|
|
'refresh_token': refresh_token,
|
|
'grant_type': 'refresh_token',
|
|
}
|
|
token_url = OAUTH_PROVIDERS['google']['token_url']
|
|
elif provider == 'meta':
|
|
# Meta long-lived tokens: exchange short-lived for long-lived
|
|
params = {
|
|
'grant_type': 'fb_exchange_token',
|
|
'client_id': self.meta_app_id,
|
|
'client_secret': self.meta_app_secret,
|
|
'fb_exchange_token': refresh_token,
|
|
}
|
|
try:
|
|
response = requests.get(
|
|
OAUTH_PROVIDERS['meta']['token_url'],
|
|
params=params,
|
|
timeout=15
|
|
)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Meta token refresh error: {e}")
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
try:
|
|
response = requests.post(token_url, data=data, timeout=15)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
logger.error(f"Token refresh failed for {provider}: {response.status_code}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Token refresh error for {provider}: {e}")
|
|
return None
|
|
|
|
def _try_refresh_token(self, db, token) -> bool:
|
|
"""Attempt to refresh an expired token in-place. Returns True on success."""
|
|
if not token.refresh_token:
|
|
return False
|
|
try:
|
|
new_data = self.refresh_access_token(token.provider, token.refresh_token)
|
|
if new_data and new_data.get('access_token'):
|
|
token.access_token = new_data['access_token']
|
|
expires_in = new_data.get('expires_in')
|
|
if expires_in:
|
|
token.expires_at = datetime.now() + timedelta(seconds=int(expires_in))
|
|
token.updated_at = datetime.now()
|
|
db.commit()
|
|
logger.info(f"Auto-refreshed {token.provider}/{token.service} for company {token.company_id}")
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Token auto-refresh error {token.provider}/{token.service}: {e}")
|
|
db.rollback()
|
|
return False
|
|
|
|
def save_token(self, db, company_id: int, user_id: int, provider: str,
|
|
service: str, token_data: Dict) -> bool:
|
|
"""Save or update OAuth token in database.
|
|
|
|
Args:
|
|
db: Database session
|
|
company_id: Company ID
|
|
user_id: User who authorized
|
|
provider: 'google' or 'meta'
|
|
service: 'gbp', 'search_console', 'facebook', 'instagram'
|
|
token_data: Dict from exchange_code() or refresh_access_token()
|
|
|
|
Returns:
|
|
True if saved successfully
|
|
"""
|
|
try:
|
|
from database import OAuthToken
|
|
|
|
# Find existing token or create new
|
|
token = db.query(OAuthToken).filter(
|
|
OAuthToken.company_id == company_id,
|
|
OAuthToken.provider == provider,
|
|
OAuthToken.service == service,
|
|
).first()
|
|
|
|
if not token:
|
|
token = OAuthToken(
|
|
company_id=company_id,
|
|
user_id=user_id,
|
|
provider=provider,
|
|
service=service,
|
|
)
|
|
db.add(token)
|
|
|
|
token.access_token = token_data.get('access_token', '')
|
|
token.refresh_token = token_data.get('refresh_token', token.refresh_token)
|
|
token.token_type = token_data.get('token_type', 'Bearer')
|
|
|
|
expires_in = token_data.get('expires_in')
|
|
if expires_in:
|
|
token.expires_at = datetime.now() + timedelta(seconds=int(expires_in))
|
|
|
|
token.scopes = OAUTH_PROVIDERS.get(provider, {}).get('scopes', {}).get(service, '')
|
|
token.is_active = True
|
|
token.updated_at = datetime.now()
|
|
|
|
db.commit()
|
|
logger.info(f"OAuth token saved: {provider}/{service} for company {company_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error saving OAuth token: {e}")
|
|
return False
|
|
|
|
def get_valid_token(self, db, company_id: int, provider: str, service: str) -> Optional[str]:
|
|
"""Get a valid access token, refreshing if needed.
|
|
|
|
Args:
|
|
db: Database session
|
|
company_id: Company ID
|
|
provider: 'google' or 'meta'
|
|
service: Service name
|
|
|
|
Returns:
|
|
Valid access token string or None
|
|
"""
|
|
try:
|
|
from database import OAuthToken
|
|
|
|
token = db.query(OAuthToken).filter(
|
|
OAuthToken.company_id == company_id,
|
|
OAuthToken.provider == provider,
|
|
OAuthToken.service == service,
|
|
OAuthToken.is_active == True,
|
|
).first()
|
|
|
|
if not token:
|
|
return None
|
|
|
|
# Check if token is expired
|
|
if token.is_expired:
|
|
if token.refresh_token:
|
|
if not self._try_refresh_token(db, token):
|
|
token.is_active = False
|
|
db.commit()
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
return token.access_token
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting valid token: {e}")
|
|
return None
|
|
|
|
def get_connected_services(self, db, company_id: int) -> Dict[str, Dict]:
|
|
"""Get status of all connected OAuth services for a company.
|
|
|
|
Returns:
|
|
Dict like {'google/gbp': {'connected': True, 'account_name': '...', 'expires_at': ...}, ...}
|
|
"""
|
|
try:
|
|
from database import OAuthToken
|
|
|
|
tokens = db.query(OAuthToken).filter(
|
|
OAuthToken.company_id == company_id,
|
|
OAuthToken.is_active == True,
|
|
).all()
|
|
|
|
result = {}
|
|
for token in tokens:
|
|
key = f"{token.provider}/{token.service}"
|
|
|
|
# Auto-refresh expired tokens
|
|
refresh_failed = False
|
|
if token.is_expired and token.refresh_token:
|
|
if not self._try_refresh_token(db, token):
|
|
refresh_failed = True
|
|
|
|
result[key] = {
|
|
'connected': True,
|
|
'account_name': token.account_name,
|
|
'account_id': token.account_id,
|
|
'expires_at': token.expires_at.isoformat() if token.expires_at else None,
|
|
'is_expired': token.is_expired,
|
|
'refresh_failed': refresh_failed,
|
|
'updated_at': token.updated_at.isoformat() if token.updated_at else None,
|
|
}
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting connected services: {e}")
|
|
return {}
|