From 5841116ff1a99684866652c2246a310daac42e66 Mon Sep 17 00:00:00 2001 From: Maciej Pienczyn Date: Fri, 27 Mar 2026 13:27:03 +0100 Subject: [PATCH] =?UTF-8?q?feat(messages):=20add=20data=20migration=20scri?= =?UTF-8?q?pt=20(old=20=E2=86=92=20unified=20model)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- scripts/migrate_messages.py | 318 ++++++++++++++++++++++++++++++++++++ 1 file changed, 318 insertions(+) create mode 100644 scripts/migrate_messages.py diff --git a/scripts/migrate_messages.py b/scripts/migrate_messages.py new file mode 100644 index 0000000..7521431 --- /dev/null +++ b/scripts/migrate_messages.py @@ -0,0 +1,318 @@ +#!/usr/bin/env python3 +""" +Migrate Messages — Old System to Unified Conversations +======================================================= + +Migrates private_messages and message_groups to the unified +Conversation + ConvMessage model. + +Usage: + DATABASE_URL=... python3 scripts/migrate_messages.py [--dry-run] +""" + +import os +import sys +import argparse +import traceback +from datetime import datetime +from collections import defaultdict + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from database import (SessionLocal, PrivateMessage, MessageGroup, MessageGroupMember, + GroupMessage, MessageAttachment, Conversation, ConversationMember, + ConvMessage) + + +def build_content(subject, content): + """Prepend subject as bold line if present.""" + if subject and subject.strip(): + return f"**{subject.strip()}**\n\n{content}" + return content + + +def migrate_private_messages(db, dry_run): + """Phase 1: private_messages → 1:1 conversations.""" + print("\n--- Phase 1: Private Messages → 1:1 Conversations ---") + + messages = ( + db.query(PrivateMessage) + .order_by(PrivateMessage.created_at) + .all() + ) + print(f" Found {len(messages)} private messages") + + # Group by unique user pairs (order-independent) + pairs = defaultdict(list) + for msg in messages: + key = frozenset([msg.sender_id, msg.recipient_id]) + pairs[key].append(msg) + + print(f" Found {len(pairs)} unique 1:1 pairs") + + # old private_message.id → new ConvMessage.id + pm_id_map = {} + convs_created = 0 + msgs_created = 0 + + for pair, pair_msgs in pairs.items(): + pair_msgs.sort(key=lambda m: m.created_at) + first_msg = pair_msgs[0] + user_ids = list(pair) + sender_id = first_msg.sender_id + recipient_id = first_msg.recipient_id + + conv = Conversation( + is_group=False, + owner_id=sender_id, + created_at=first_msg.created_at, + updated_at=first_msg.created_at, + ) + db.add(conv) + db.flush() # get conv.id + + # Create members: owner role for first sender, member for the other + other_id = recipient_id if sender_id in user_ids else sender_id + db.add(ConversationMember( + conversation_id=conv.id, + user_id=sender_id, + role='owner', + joined_at=first_msg.created_at, + )) + db.add(ConversationMember( + conversation_id=conv.id, + user_id=other_id, + role='member', + joined_at=first_msg.created_at, + )) + + # Track last_read_at per recipient from PrivateMessage.read_at + last_read_by = {} + + last_conv_msg = None + for pm in pair_msgs: + full_content = build_content(pm.subject, pm.content) + + # Resolve reply_to_id from old parent_id + reply_to_id = None + if pm.parent_id and pm.parent_id in pm_id_map: + reply_to_id = pm_id_map[pm.parent_id] + + conv_msg = ConvMessage( + conversation_id=conv.id, + sender_id=pm.sender_id, + content=full_content, + reply_to_id=reply_to_id, + created_at=pm.created_at, + ) + db.add(conv_msg) + db.flush() + + pm_id_map[pm.id] = conv_msg.id + last_conv_msg = conv_msg + msgs_created += 1 + + # Track last read_at for the recipient + if pm.is_read and pm.read_at: + current = last_read_by.get(pm.recipient_id) + if current is None or pm.read_at > current: + last_read_by[pm.recipient_id] = pm.read_at + + # Update last_read_at on ConversationMember records + if last_read_by: + for member in db.query(ConversationMember).filter_by(conversation_id=conv.id).all(): + if member.user_id in last_read_by: + member.last_read_at = last_read_by[member.user_id] + + # Set last_message_id and updated_at on conversation + if last_conv_msg: + conv.last_message_id = last_conv_msg.id + conv.updated_at = last_conv_msg.created_at + + convs_created += 1 + + print(f" Created {convs_created} 1:1 conversations, {msgs_created} conv_messages") + return pm_id_map + + +def migrate_groups(db, dry_run): + """Phase 2: message_group + group_message → group conversations.""" + print("\n--- Phase 2: Groups → Group Conversations ---") + + groups = db.query(MessageGroup).order_by(MessageGroup.created_at).all() + print(f" Found {len(groups)} message groups") + + # old group_message.id → new ConvMessage.id + gm_id_map = {} + convs_created = 0 + msgs_created = 0 + + for group in groups: + conv = Conversation( + is_group=True, + name=group.name, + owner_id=group.owner_id, + created_at=group.created_at, + updated_at=group.updated_at or group.created_at, + ) + db.add(conv) + db.flush() + + # Migrate members + for mgm in group.members: + role = mgm.role if mgm.role in ('owner', 'moderator', 'member') else 'member' + db.add(ConversationMember( + conversation_id=conv.id, + user_id=mgm.user_id, + role=role, + last_read_at=mgm.last_read_at, + joined_at=mgm.joined_at or group.created_at, + added_by_id=mgm.added_by_id, + )) + + # Migrate group messages + group_messages = ( + db.query(GroupMessage) + .filter_by(group_id=group.id) + .order_by(GroupMessage.created_at) + .all() + ) + + last_conv_msg = None + for gm in group_messages: + conv_msg = ConvMessage( + conversation_id=conv.id, + sender_id=gm.sender_id, + content=gm.content, + created_at=gm.created_at, + ) + db.add(conv_msg) + db.flush() + + gm_id_map[gm.id] = conv_msg.id + last_conv_msg = conv_msg + msgs_created += 1 + + if last_conv_msg: + conv.last_message_id = last_conv_msg.id + conv.updated_at = last_conv_msg.created_at + + convs_created += 1 + + print(f" Created {convs_created} group conversations, {msgs_created} conv_messages") + return gm_id_map + + +def migrate_attachments(db, pm_id_map, gm_id_map): + """Phase 3: Update message_attachments with conv_message_id.""" + print("\n--- Phase 3: Attachments ---") + + attachments = db.query(MessageAttachment).all() + updated = 0 + skipped = 0 + + for att in attachments: + if att.conv_message_id: + # Already migrated (e.g. from a previous partial run) + skipped += 1 + continue + + if att.message_id and att.message_id in pm_id_map: + att.conv_message_id = pm_id_map[att.message_id] + updated += 1 + elif att.group_message_id and att.group_message_id in gm_id_map: + att.conv_message_id = gm_id_map[att.group_message_id] + updated += 1 + else: + skipped += 1 + + print(f" Updated {updated} attachments, skipped {skipped}") + return updated + + +def validate(db): + """Phase 4: Validation — compare old vs new counts.""" + print("\n--- Phase 4: Validation ---") + + old_pm_count = db.query(PrivateMessage).count() + old_gm_count = db.query(GroupMessage).count() + old_att_count = db.query(MessageAttachment).count() + old_groups_count = db.query(MessageGroup).count() + + new_conv_1to1 = db.query(Conversation).filter_by(is_group=False).count() + new_conv_group = db.query(Conversation).filter_by(is_group=True).count() + + new_msgs_1to1 = ( + db.query(ConvMessage) + .join(Conversation, ConvMessage.conversation_id == Conversation.id) + .filter(Conversation.is_group == False) + .count() + ) + new_msgs_group = ( + db.query(ConvMessage) + .join(Conversation, ConvMessage.conversation_id == Conversation.id) + .filter(Conversation.is_group == True) + .count() + ) + + att_with_conv = db.query(MessageAttachment).filter( + MessageAttachment.conv_message_id != None + ).count() + + print(f" Private messages: old={old_pm_count:>6} → new conv_messages(1:1)={new_msgs_1to1:>6} {'OK' if old_pm_count == new_msgs_1to1 else 'MISMATCH'}") + print(f" Group messages: old={old_gm_count:>6} → new conv_messages(grp)={new_msgs_group:>6} {'OK' if old_gm_count == new_msgs_group else 'MISMATCH'}") + print(f" Message groups: old={old_groups_count:>6} → new conversations(grp)={new_conv_group:>6} {'OK' if old_groups_count == new_conv_group else 'MISMATCH'}") + print(f" Attachments total: {old_att_count:>6} → with conv_message_id={att_with_conv:>6}") + print(f" 1:1 conversations created: {new_conv_1to1}") + + all_ok = (old_pm_count == new_msgs_1to1 and old_gm_count == new_msgs_group + and old_groups_count == new_conv_group) + return all_ok + + +def main(): + parser = argparse.ArgumentParser( + description='Migrate messages from old model to unified Conversation model' + ) + parser.add_argument( + '--dry-run', + action='store_true', + help='Run migration but rollback at the end (no changes saved)' + ) + args = parser.parse_args() + + if args.dry_run: + print("=== DRY RUN MODE — changes will be rolled back ===") + else: + print("=== LIVE MIGRATION — changes will be committed ===") + + db = SessionLocal() + try: + pm_id_map = migrate_private_messages(db, args.dry_run) + gm_id_map = migrate_groups(db, args.dry_run) + migrate_attachments(db, pm_id_map, gm_id_map) + all_ok = validate(db) + + if args.dry_run: + db.rollback() + print("\nDry run complete — rolled back all changes.") + else: + if all_ok: + db.commit() + print("\nMigration complete — all changes committed.") + else: + db.rollback() + print("\nValidation failed — rolled back all changes. Fix issues and retry.") + sys.exit(1) + + except Exception: + db.rollback() + print("\nError during migration — rolled back.") + traceback.print_exc() + sys.exit(1) + finally: + db.close() + + +if __name__ == '__main__': + main()