""" Redis service for pub/sub (SSE events) and presence tracking. Uses same Redis instance as Flask-Limiter (localhost:6379/0). """ import json import logging from datetime import datetime, timezone logger = logging.getLogger(__name__) _redis_client = None _redis_available = False def init_redis(app=None): """Connect to Redis, set global _redis_client and _redis_available. Graceful fallback if Redis unavailable. """ global _redis_client, _redis_available try: import redis client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True, socket_connect_timeout=2, socket_timeout=2) client.ping() _redis_client = client _redis_available = True logger.info("Redis service initialized (localhost:6379/0)") except Exception as e: _redis_client = None _redis_available = False logger.warning(f"Redis service unavailable, pub/sub and presence disabled: {e}") def get_redis(): """Return Redis client or None if unavailable.""" return _redis_client def is_available(): """Return True if Redis is available.""" return _redis_available # --------------------------------------------------------------------------- # Pub/Sub helpers # --------------------------------------------------------------------------- def publish_event(user_id, event_type, data): """Publish a JSON event to user:{user_id}:events channel. Args: user_id: Target user ID. event_type: String event type (e.g. 'new_message', 'typing', 'reaction'). data: Dict payload to include in the event. """ if not _redis_available or _redis_client is None: return try: channel = f"user:{user_id}:events" payload = json.dumps({"type": event_type, **data}) _redis_client.publish(channel, payload) except Exception as e: logger.error(f"publish_event failed for user {user_id}: {e}") def publish_to_conversation(db, conversation_id, event_type, data, exclude_user_id=None): """Publish an event to all members of a conversation. Queries ConversationMember for all members of the conversation and calls publish_event for each (except excluded user). Args: db: SQLAlchemy db instance. conversation_id: ID of the conversation. event_type: String event type. data: Dict payload. exclude_user_id: Optional user ID to skip (e.g. the sender). """ if not _redis_available or _redis_client is None: return try: from database import ConversationMember members = db.session.query(ConversationMember).filter_by( conversation_id=conversation_id ).all() for member in members: if exclude_user_id is not None and member.user_id == exclude_user_id: continue publish_event(member.user_id, event_type, data) except Exception as e: logger.error(f"publish_to_conversation failed for conversation {conversation_id}: {e}") # --------------------------------------------------------------------------- # Presence helpers # --------------------------------------------------------------------------- def set_user_online(user_id): """Mark user as online with 60s TTL via SETEX.""" if not _redis_available or _redis_client is None: return try: _redis_client.setex(f"user:{user_id}:online", 60, "1") except Exception as e: logger.error(f"set_user_online failed for user {user_id}: {e}") def is_user_online(user_id): """Return True if user has an active online key in Redis.""" if not _redis_available or _redis_client is None: return False try: return _redis_client.exists(f"user:{user_id}:online") > 0 except Exception as e: logger.error(f"is_user_online failed for user {user_id}: {e}") return False def get_user_last_seen(user_id): """Return the stored last_seen ISO timestamp string, or None.""" if not _redis_available or _redis_client is None: return None try: return _redis_client.get(f"user:{user_id}:last_seen") except Exception as e: logger.error(f"get_user_last_seen failed for user {user_id}: {e}") return None def update_last_seen(user_id): """Set user:{user_id}:last_seen to current UTC ISO timestamp.""" if not _redis_available or _redis_client is None: return try: ts = datetime.now(timezone.utc).isoformat() _redis_client.set(f"user:{user_id}:last_seen", ts) except Exception as e: logger.error(f"update_last_seen failed for user {user_id}: {e}")