nordabiz/scripts/migrate_messages.py
Maciej Pienczyn 51f041baa4
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
fix(messages): use HTML strong tag instead of markdown, remove reply_to mapping from migration
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 13:43:21 +01:00

320 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Migrate Messages — Old System to Unified Conversations
=======================================================
Migrates private_messages and message_groups to the unified
Conversation + ConvMessage model.
Usage:
DATABASE_URL=... python3 scripts/migrate_messages.py [--dry-run]
"""
import os
import sys
import argparse
import traceback
from datetime import datetime
from collections import defaultdict
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from database import (SessionLocal, PrivateMessage, MessageGroup, MessageGroupMember,
GroupMessage, MessageAttachment, Conversation, ConversationMember,
ConvMessage)
def build_content(subject, content):
"""Prepend subject as bold line if present."""
if subject and subject.strip():
return f"<p><strong>{subject.strip()}</strong></p>{content}"
return content
def migrate_private_messages(db, dry_run):
"""Phase 1: private_messages → 1:1 conversations."""
print("\n--- Phase 1: Private Messages → 1:1 Conversations ---")
messages = (
db.query(PrivateMessage)
.order_by(PrivateMessage.created_at)
.all()
)
print(f" Found {len(messages)} private messages")
# Group by unique user pairs (order-independent)
pairs = defaultdict(list)
for msg in messages:
key = frozenset([msg.sender_id, msg.recipient_id])
pairs[key].append(msg)
print(f" Found {len(pairs)} unique 1:1 pairs")
# old private_message.id → new ConvMessage.id
pm_id_map = {}
convs_created = 0
msgs_created = 0
for pair, pair_msgs in pairs.items():
pair_msgs.sort(key=lambda m: m.created_at)
first_msg = pair_msgs[0]
user_ids = list(pair)
sender_id = first_msg.sender_id
recipient_id = first_msg.recipient_id
conv = Conversation(
is_group=False,
owner_id=sender_id,
created_at=first_msg.created_at,
updated_at=first_msg.created_at,
)
db.add(conv)
db.flush() # get conv.id
# Create members: owner role for first sender, member for the other
other_id = recipient_id if sender_id in user_ids else sender_id
db.add(ConversationMember(
conversation_id=conv.id,
user_id=sender_id,
role='owner',
joined_at=first_msg.created_at,
))
# Skip duplicate if sender == recipient (self-message)
if other_id != sender_id:
db.add(ConversationMember(
conversation_id=conv.id,
user_id=other_id,
role='member',
joined_at=first_msg.created_at,
))
# Track last_read_at per recipient from PrivateMessage.read_at
last_read_by = {}
last_conv_msg = None
for pm in pair_msgs:
full_content = build_content(pm.subject, pm.content)
# Note: old parent_id was thread-root (email-style), not a specific quote.
# Don't map to reply_to_id — it would create visual clutter with every
# reply quoting the same root message.
conv_msg = ConvMessage(
conversation_id=conv.id,
sender_id=pm.sender_id,
content=full_content,
reply_to_id=None,
created_at=pm.created_at,
)
db.add(conv_msg)
db.flush()
pm_id_map[pm.id] = conv_msg.id
last_conv_msg = conv_msg
msgs_created += 1
# Track last read_at for the recipient
if pm.is_read and pm.read_at:
current = last_read_by.get(pm.recipient_id)
if current is None or pm.read_at > current:
last_read_by[pm.recipient_id] = pm.read_at
# Update last_read_at on ConversationMember records
if last_read_by:
for member in db.query(ConversationMember).filter_by(conversation_id=conv.id).all():
if member.user_id in last_read_by:
member.last_read_at = last_read_by[member.user_id]
# Set last_message_id and updated_at on conversation
if last_conv_msg:
conv.last_message_id = last_conv_msg.id
conv.updated_at = last_conv_msg.created_at
convs_created += 1
print(f" Created {convs_created} 1:1 conversations, {msgs_created} conv_messages")
return pm_id_map
def migrate_groups(db, dry_run):
"""Phase 2: message_group + group_message → group conversations."""
print("\n--- Phase 2: Groups → Group Conversations ---")
groups = db.query(MessageGroup).order_by(MessageGroup.created_at).all()
print(f" Found {len(groups)} message groups")
# old group_message.id → new ConvMessage.id
gm_id_map = {}
convs_created = 0
msgs_created = 0
for group in groups:
conv = Conversation(
is_group=True,
name=group.name,
owner_id=group.owner_id,
created_at=group.created_at,
updated_at=group.updated_at or group.created_at,
)
db.add(conv)
db.flush()
# Migrate members
for mgm in group.members:
role = mgm.role if mgm.role in ('owner', 'moderator', 'member') else 'member'
db.add(ConversationMember(
conversation_id=conv.id,
user_id=mgm.user_id,
role=role,
last_read_at=mgm.last_read_at,
joined_at=mgm.joined_at or group.created_at,
added_by_id=mgm.added_by_id,
))
# Migrate group messages
group_messages = (
db.query(GroupMessage)
.filter_by(group_id=group.id)
.order_by(GroupMessage.created_at)
.all()
)
last_conv_msg = None
for gm in group_messages:
conv_msg = ConvMessage(
conversation_id=conv.id,
sender_id=gm.sender_id,
content=gm.content,
created_at=gm.created_at,
)
db.add(conv_msg)
db.flush()
gm_id_map[gm.id] = conv_msg.id
last_conv_msg = conv_msg
msgs_created += 1
if last_conv_msg:
conv.last_message_id = last_conv_msg.id
conv.updated_at = last_conv_msg.created_at
convs_created += 1
print(f" Created {convs_created} group conversations, {msgs_created} conv_messages")
return gm_id_map
def migrate_attachments(db, pm_id_map, gm_id_map):
"""Phase 3: Update message_attachments with conv_message_id."""
print("\n--- Phase 3: Attachments ---")
attachments = db.query(MessageAttachment).all()
updated = 0
skipped = 0
for att in attachments:
if att.conv_message_id:
# Already migrated (e.g. from a previous partial run)
skipped += 1
continue
if att.message_id and att.message_id in pm_id_map:
att.conv_message_id = pm_id_map[att.message_id]
updated += 1
elif att.group_message_id and att.group_message_id in gm_id_map:
att.conv_message_id = gm_id_map[att.group_message_id]
updated += 1
else:
skipped += 1
print(f" Updated {updated} attachments, skipped {skipped}")
return updated
def validate(db):
"""Phase 4: Validation — compare old vs new counts."""
print("\n--- Phase 4: Validation ---")
old_pm_count = db.query(PrivateMessage).count()
old_gm_count = db.query(GroupMessage).count()
old_att_count = db.query(MessageAttachment).count()
old_groups_count = db.query(MessageGroup).count()
new_conv_1to1 = db.query(Conversation).filter_by(is_group=False).count()
new_conv_group = db.query(Conversation).filter_by(is_group=True).count()
new_msgs_1to1 = (
db.query(ConvMessage)
.join(Conversation, ConvMessage.conversation_id == Conversation.id)
.filter(Conversation.is_group == False)
.count()
)
new_msgs_group = (
db.query(ConvMessage)
.join(Conversation, ConvMessage.conversation_id == Conversation.id)
.filter(Conversation.is_group == True)
.count()
)
att_with_conv = db.query(MessageAttachment).filter(
MessageAttachment.conv_message_id != None
).count()
print(f" Private messages: old={old_pm_count:>6} → new conv_messages(1:1)={new_msgs_1to1:>6} {'OK' if old_pm_count == new_msgs_1to1 else 'MISMATCH'}")
print(f" Group messages: old={old_gm_count:>6} → new conv_messages(grp)={new_msgs_group:>6} {'OK' if old_gm_count == new_msgs_group else 'MISMATCH'}")
print(f" Message groups: old={old_groups_count:>6} → new conversations(grp)={new_conv_group:>6} {'OK' if old_groups_count == new_conv_group else 'MISMATCH'}")
print(f" Attachments total: {old_att_count:>6} → with conv_message_id={att_with_conv:>6}")
print(f" 1:1 conversations created: {new_conv_1to1}")
all_ok = (old_pm_count == new_msgs_1to1 and old_gm_count == new_msgs_group
and old_groups_count == new_conv_group)
return all_ok
def main():
parser = argparse.ArgumentParser(
description='Migrate messages from old model to unified Conversation model'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Run migration but rollback at the end (no changes saved)'
)
args = parser.parse_args()
if args.dry_run:
print("=== DRY RUN MODE — changes will be rolled back ===")
else:
print("=== LIVE MIGRATION — changes will be committed ===")
db = SessionLocal()
try:
pm_id_map = migrate_private_messages(db, args.dry_run)
gm_id_map = migrate_groups(db, args.dry_run)
migrate_attachments(db, pm_id_map, gm_id_map)
all_ok = validate(db)
if args.dry_run:
db.rollback()
print("\nDry run complete — rolled back all changes.")
else:
if all_ok:
db.commit()
print("\nMigration complete — all changes committed.")
else:
db.rollback()
print("\nValidation failed — rolled back all changes. Fix issues and retry.")
sys.exit(1)
except Exception:
db.rollback()
print("\nError during migration — rolled back.")
traceback.print_exc()
sys.exit(1)
finally:
db.close()
if __name__ == '__main__':
main()