nordabiz/blueprints/push/routes.py
Maciej Pienczyn 6c4db17807
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat(push): Web Push (VAPID + pywebpush) dla prywatnych wiadomości
Pierwsza iteracja — trigger to nowa wiadomość prywatna. Rollout
fazowany przez PUSH_USER_WHITELIST w .env: pusta = wszyscy, lista
user_id = tylko wymienieni. Ta sama flaga kontroluje widoczność
dzwonka w navbarze (context_processor inject_push_visibility).

Co jest:
- database/migrations/100 — push_subscriptions + notify_push_messages
- database.py — PushSubscription model + relacja na User
- blueprints/push/ — vapid-public-key, subscribe, unsubscribe, test,
  pending-url (iOS PWA), CSRF exempt, auto-prune martwych (410/404/403)
- static/sw.js — push + notificationclick (z iOS fallback przez
  /push/pending-url w Redis, TTL 5 min)
- static/js/push-client.js — togglePush, iOS detection, ?pushdiag=1
- base.html — dzwonek + wpięcie skryptu gated przez push_bell_visible
- message_routes.py — _send_message_push_notifications po emailach
- requirements.txt — pywebpush==2.0.3

Kill switch: PUSH_KILL_SWITCH=1 zatrzymuje wszystkie wysyłki.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 16:56:49 +02:00

154 lines
4.5 KiB
Python

"""Web Push HTTP endpoints.
GET /push/vapid-public-key — publiczny klucz VAPID dla klienta JS
POST /push/subscribe — zapisz subskrypcję w DB
POST /push/unsubscribe — usuń subskrypcję
POST /push/test — self-push (test własnego subskrybenta)
POST /push/pending-url — (iOS PWA) zapis URL z notificationclick
GET /push/pending-url — odczytaj i wyczyść pending URL
"""
import logging
from datetime import datetime
from flask import jsonify, request
from flask_login import login_required, current_user
from database import SessionLocal, PushSubscription
from extensions import limiter
from . import bp
from .push_service import (
send_push,
set_pending_url,
pop_pending_url,
_vapid_public_key,
)
logger = logging.getLogger(__name__)
def exempt_from_csrf(app):
"""Exempt push routes from CSRF protection (JS fetch from SW + frontend)."""
csrf = app.extensions.get('csrf')
if csrf:
csrf.exempt(bp)
@bp.route('/vapid-public-key', methods=['GET'])
@login_required
@limiter.exempt
def vapid_public_key():
key = _vapid_public_key()
if not key:
return jsonify({'error': 'VAPID not configured'}), 503
return jsonify({'key': key})
@bp.route('/subscribe', methods=['POST'])
@login_required
@limiter.limit('30 per minute')
def subscribe():
payload = request.get_json(silent=True) or {}
endpoint = (payload.get('endpoint') or '').strip()
keys = payload.get('keys') or {}
p256dh = (keys.get('p256dh') or '').strip()
auth = (keys.get('auth') or '').strip()
if not endpoint or not p256dh or not auth:
return jsonify({'error': 'endpoint, p256dh i auth wymagane'}), 400
user_agent = (request.headers.get('User-Agent') or '')[:500]
db = SessionLocal()
try:
existing = db.query(PushSubscription).filter_by(endpoint=endpoint).first()
if existing:
existing.user_id = current_user.id
existing.p256dh = p256dh
existing.auth = auth
existing.user_agent = user_agent
existing.last_used_at = datetime.now()
else:
sub = PushSubscription(
user_id=current_user.id,
endpoint=endpoint,
p256dh=p256dh,
auth=auth,
user_agent=user_agent,
)
db.add(sub)
db.commit()
return jsonify({'ok': True})
except Exception as e:
db.rollback()
logger.error("push subscribe error: %s", e)
return jsonify({'error': 'Błąd zapisu subskrypcji'}), 500
finally:
db.close()
@bp.route('/unsubscribe', methods=['POST'])
@login_required
@limiter.limit('30 per minute')
def unsubscribe():
payload = request.get_json(silent=True) or {}
endpoint = (payload.get('endpoint') or '').strip()
if not endpoint:
return jsonify({'error': 'endpoint wymagany'}), 400
db = SessionLocal()
try:
sub = db.query(PushSubscription).filter_by(
endpoint=endpoint, user_id=current_user.id
).first()
if sub:
db.delete(sub)
db.commit()
return jsonify({'ok': True})
except Exception as e:
db.rollback()
logger.error("push unsubscribe error: %s", e)
return jsonify({'error': 'Błąd'}), 500
finally:
db.close()
@bp.route('/test', methods=['POST'])
@login_required
@limiter.limit('5 per minute')
def test_push():
stats = send_push(
user_id=current_user.id,
title='🔔 Powiadomienia działają',
body='To testowe powiadomienie. Zobaczysz takie okienko kiedy ktoś napisze do Ciebie wiadomość.',
url='/wiadomosci',
tag='push-test',
)
return jsonify({'ok': True, **stats})
@bp.route('/pending-url', methods=['POST'])
@login_required
@limiter.limit('60 per minute')
def pending_url_set():
payload = request.get_json(silent=True) or {}
url = (payload.get('url') or '').strip()
if not url or not url.startswith('/'):
return jsonify({'error': 'URL must be same-origin path'}), 400
set_pending_url(current_user.id, url)
return jsonify({'ok': True})
@bp.route('/pending-url', methods=['GET'])
@login_required
@limiter.exempt
def pending_url_get():
url = pop_pending_url(current_user.id)
return jsonify({'url': url})
@bp.route('/pending-url/clear', methods=['POST'])
@login_required
@limiter.exempt
def pending_url_clear():
pop_pending_url(current_user.id)
return jsonify({'ok': True})