""" Post-Rada Admission Workflow Engine ==================================== Automatically processes board meeting protocols to: 1. Extract admitted companies from proceedings 2. Match them to existing companies in DB 3. Create placeholder profiles for new companies 4. Notify Office Managers about companies needing attention """ import re import logging from datetime import datetime from database import SessionLocal, BoardMeeting, Company, AdmissionWorkflowLog, User from sqlalchemy import func, text logger = logging.getLogger(__name__) def run_admission_workflow(meeting_id: int, executed_by_user_id: int) -> dict: """Main entry point. Called in background thread after protocol publish.""" db = SessionLocal() try: meeting = db.query(BoardMeeting).filter_by(id=meeting_id).first() if not meeting or not meeting.proceedings: logger.warning(f"Admission workflow: meeting {meeting_id} not found or no proceedings") return {'status': 'skipped', 'reason': 'no proceedings'} # Idempotency: check if already processed existing_log = db.query(AdmissionWorkflowLog).filter_by(meeting_id=meeting_id).first() # Extract companies from proceedings extracted = extract_admitted_companies(meeting.proceedings) if not extracted: logger.info(f"Admission workflow: no admissions found in meeting {meeting_id}") if not existing_log: log = AdmissionWorkflowLog( meeting_id=meeting_id, executed_by=executed_by_user_id, extracted_companies=[], status='completed' ) db.add(log) db.commit() return {'status': 'completed', 'extracted': 0} matched = [] created = [] skipped = [] for item in extracted: name = item['extracted_name'] # Skip if already linked to this meeting existing_company = db.query(Company).filter( Company.admitted_at_meeting_id == meeting_id, func.lower(Company.name) == func.lower(name) ).first() if existing_company: skipped.append({'name': name, 'reason': 'already_linked', 'company_id': existing_company.id}) continue # Try to match existing company company, confidence = match_company_by_name(db, name) if company and confidence >= 0.5: # Link existing company link_existing_company(db, company, meeting_id, meeting.meeting_date) matched.append({ 'extracted_name': name, 'matched_id': company.id, 'matched_name': company.name, 'confidence': confidence }) else: # Create placeholder new_company = create_placeholder_company(db, name, meeting_id, meeting.meeting_date) created.append({ 'name': new_company.name, 'id': new_company.id, 'slug': new_company.slug }) db.flush() # Send notifications notif_count, email_count = notify_office_managers(db, meeting, { 'extracted': extracted, 'matched': matched, 'created': created, 'skipped': skipped }) # Log the workflow run if existing_log: # Update existing log existing_log.extracted_companies = [e for e in extracted] existing_log.matched_companies = matched existing_log.created_companies = created existing_log.skipped = skipped existing_log.notifications_sent = notif_count existing_log.emails_sent = email_count existing_log.executed_at = datetime.now() existing_log.executed_by = executed_by_user_id else: log = AdmissionWorkflowLog( meeting_id=meeting_id, executed_by=executed_by_user_id, extracted_companies=[e for e in extracted], matched_companies=matched, created_companies=created, skipped=skipped, notifications_sent=notif_count, emails_sent=email_count, status='completed' ) db.add(log) db.commit() logger.info( f"Admission workflow completed for meeting {meeting_id}: " f"{len(extracted)} extracted, {len(matched)} matched, " f"{len(created)} created, {len(skipped)} skipped" ) return { 'status': 'completed', 'extracted': len(extracted), 'matched': len(matched), 'created': len(created), 'skipped': len(skipped) } except Exception as e: logger.error(f"Admission workflow failed for meeting {meeting_id}: {e}", exc_info=True) try: db.rollback() error_log = AdmissionWorkflowLog( meeting_id=meeting_id, executed_by=executed_by_user_id, status='failed', error_message=str(e) ) db.add(error_log) db.commit() except Exception: pass return {'status': 'failed', 'error': str(e)} finally: db.close() def extract_admitted_companies(proceedings: list) -> list: """ Parse proceedings JSONB to find admission decisions. Looks for proceedings where: - title matches "Prezentacja firmy X -- kandydat" pattern - decisions contain "Przyjeta/Przyjety jednoglosnie" (not "przeniesiona") """ results = [] for i, proc in enumerate(proceedings): title = proc.get('title', '') decisions = proc.get('decisions', []) if isinstance(decisions, str): decisions = [decisions] # Collect all admission decisions from this proceeding admission_decisions = [] for d in decisions: d_lower = d.lower() if ('przyjęt' in d_lower and 'jednogłośnie' in d_lower and 'przeniesion' not in d_lower and 'program' not in d_lower and 'protokół' not in d_lower and 'protokol' not in d_lower): admission_decisions.append(d) if not admission_decisions: continue # Try to extract company name from title first # Pattern 1: "Prezentacja firmy X — kandydat na członka Izby" # Pattern 2: "Prezentacja: X – coach/mentoring (kandydatka na członka Izby)" title_name = None for pattern in [ r'[Pp]rezentacja\s+firmy\s+(.+?)\s*[—–\-]\s*kandydat', r'[Pp]rezentacja:\s+(.+?)\s*[—–\-]\s*', r'[Pp]rezentacja\s+firmy\s+(.+?)$', ]: match = re.search(pattern, title) if match: title_name = match.group(1).strip().rstrip('.') break if title_name: # Single company from title — use first admission decision results.append({ 'title': title, 'extracted_name': title_name, 'decision_text': admission_decisions[0], 'proceeding_index': i }) else: # Bulk admission — extract company names from each decision # Pattern: "Przyjęto jednogłośnie firmę X jako nowego członka Izby" for d in admission_decisions: match = re.search(r'[Pp]rzyjęt[oa]\s+jednogłośnie\s+firmę\s+(.+?)\s+jako', d) if match: company_name = match.group(1).strip().rstrip('.') results.append({ 'title': title, 'extracted_name': company_name, 'decision_text': d, 'proceeding_index': i }) return results def match_company_by_name(db, name: str) -> tuple: """ Try to find existing company by name. Returns (Company or None, confidence float). """ # 1. Exact case-insensitive match exact = db.query(Company).filter( func.lower(Company.name) == func.lower(name) ).first() if exact: return (exact, 1.0) # 2. ILIKE contains match ilike = db.query(Company).filter( Company.name.ilike(f'%{name}%') ).first() if ilike: return (ilike, 0.8) # 3. Reverse ILIKE (DB name contained in extracted name) # e.g. extracted "Prospoland" matches DB "Pros Poland" all_companies = db.query(Company.id, Company.name).all() for c_id, c_name in all_companies: if c_name and (c_name.lower() in name.lower() or name.lower() in c_name.lower()): company = db.query(Company).filter_by(id=c_id).first() return (company, 0.7) # 4. pg_trgm similarity (if extension available) try: result = db.execute( text("SELECT id, name, similarity(name, :name) as sim FROM companies WHERE similarity(name, :name) > 0.5 ORDER BY sim DESC LIMIT 1"), {'name': name} ).first() if result: company = db.query(Company).filter_by(id=result[0]).first() return (company, float(result[2])) except Exception: pass return (None, 0.0) def create_placeholder_company(db, name: str, meeting_id: int, meeting_date) -> Company: """Create a minimal placeholder company.""" import unicodedata # Generate slug slug = name.lower().strip() slug = unicodedata.normalize('NFKD', slug).encode('ascii', 'ignore').decode('ascii') slug = re.sub(r'[^a-z0-9]+', '-', slug).strip('-') # Ensure unique base_slug = slug counter = 1 while db.query(Company).filter_by(slug=slug).first(): slug = f"{base_slug}-{counter}" counter += 1 company = Company( name=name, slug=slug, status='pending', data_quality='basic', admitted_at_meeting_id=meeting_id, member_since=meeting_date, ) db.add(company) db.flush() # Get ID logger.info(f"Created placeholder company: {name} (ID {company.id}, slug {slug})") return company def link_existing_company(db, company, meeting_id: int, meeting_date): """Link existing company to a board meeting admission.""" if not company.admitted_at_meeting_id: company.admitted_at_meeting_id = meeting_id if not company.member_since: company.member_since = meeting_date logger.info(f"Linked company {company.name} (ID {company.id}) to meeting {meeting_id}") def notify_office_managers(db, meeting, results: dict) -> tuple: """Send in-app notifications and emails to Office Managers.""" notif_count = 0 email_count = 0 total = len(results.get('matched', [])) + len(results.get('created', [])) needs_attention = len(results.get('created', [])) if total == 0: return (0, 0) # Find Office Managers and Admins managers = db.query(User).filter( User.role.in_(['ADMIN', 'OFFICE_MANAGER']), User.is_active == True # noqa: E712 ).all() meeting_id_str = f"{meeting.meeting_number}/{meeting.year}" action_url = f"/rada/posiedzenia/{meeting.id}/przyjecia" title = f"Rada {meeting_id_str} — nowi członkowie" if needs_attention > 0: message = f"Przyjęto {total} firm. {needs_attention} wymaga uzupełnienia profilu." else: message = f"Przyjęto {total} firm. Wszystkie profile są już uzupełnione." for manager in managers: try: from utils.notifications import create_notification create_notification( user_id=manager.id, title=title, message=message, notification_type='system', related_type='board_meeting', related_id=meeting.id, action_url=action_url ) notif_count += 1 except Exception as e: logger.error(f"Failed to notify user {manager.id}: {e}") # Send email to managers try: from email_service import send_email # Build HTML table rows = [] for m in results.get('matched', []): rows.append(f"
| Firma | Status |
|---|
Na posiedzeniu Rady {meeting_id_str} ({meeting.meeting_date.strftime('%d.%m.%Y')}) przyjęto {total} nowych członków.
{table_html} {'Uwaga: ' + str(needs_attention) + ' firm wymaga uzupełnienia profilu na portalu.
' if needs_attention else ''} """ body_text = f"Na posiedzeniu Rady {meeting_id_str} przyjęto {total} nowych członków. {needs_attention} wymaga uzupełnienia profilu." # TEST MODE: send only to maciej.pienczyn@inpi.pl during testing test_email = 'maciej.pienczyn@inpi.pl' try: send_email( to=test_email, subject=f"[NordaBiz] Rada {meeting_id_str} — przyjęto {total} nowych członków", body_text=body_text, body_html=body_html, email_type='system', recipient_name='Maciej Pienczyn' ) email_count = 1 except Exception as e: logger.error(f"Failed to send test email: {e}") # TODO: Po zakończeniu testów — odkomentować pętlę poniżej i usunąć test_email # for manager in managers: # if manager.email: # try: # send_email( # to=manager.email, # subject=f"[NordaBiz] Rada {meeting_id_str} — przyjęto {total} nowych członków", # body_text=body_text, # body_html=body_html, # email_type='system', # recipient_name=manager.name # ) # email_count += 1 # except Exception as e: # logger.error(f"Failed to email {manager.email}: {e}") except Exception as e: logger.error(f"Failed to send admission emails: {e}") return (notif_count, email_count)