Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
Email notifications limited to test address during development. Fixed missing Polish characters in notification titles and body. Commented out production email loop (TODO marker for re-enable). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
409 lines
15 KiB
Python
409 lines
15 KiB
Python
"""
|
||
Post-Rada Admission Workflow Engine
|
||
====================================
|
||
Automatically processes board meeting protocols to:
|
||
1. Extract admitted companies from proceedings
|
||
2. Match them to existing companies in DB
|
||
3. Create placeholder profiles for new companies
|
||
4. Notify Office Managers about companies needing attention
|
||
"""
|
||
|
||
import re
|
||
import logging
|
||
from datetime import datetime
|
||
from database import SessionLocal, BoardMeeting, Company, AdmissionWorkflowLog, User
|
||
from sqlalchemy import func, text
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def run_admission_workflow(meeting_id: int, executed_by_user_id: int) -> dict:
|
||
"""Main entry point. Called in background thread after protocol publish."""
|
||
db = SessionLocal()
|
||
try:
|
||
meeting = db.query(BoardMeeting).filter_by(id=meeting_id).first()
|
||
if not meeting or not meeting.proceedings:
|
||
logger.warning(f"Admission workflow: meeting {meeting_id} not found or no proceedings")
|
||
return {'status': 'skipped', 'reason': 'no proceedings'}
|
||
|
||
# Idempotency: check if already processed
|
||
existing_log = db.query(AdmissionWorkflowLog).filter_by(meeting_id=meeting_id).first()
|
||
|
||
# Extract companies from proceedings
|
||
extracted = extract_admitted_companies(meeting.proceedings)
|
||
if not extracted:
|
||
logger.info(f"Admission workflow: no admissions found in meeting {meeting_id}")
|
||
if not existing_log:
|
||
log = AdmissionWorkflowLog(
|
||
meeting_id=meeting_id,
|
||
executed_by=executed_by_user_id,
|
||
extracted_companies=[],
|
||
status='completed'
|
||
)
|
||
db.add(log)
|
||
db.commit()
|
||
return {'status': 'completed', 'extracted': 0}
|
||
|
||
matched = []
|
||
created = []
|
||
skipped = []
|
||
|
||
for item in extracted:
|
||
name = item['extracted_name']
|
||
|
||
# Skip if already linked to this meeting
|
||
existing_company = db.query(Company).filter(
|
||
Company.admitted_at_meeting_id == meeting_id,
|
||
func.lower(Company.name) == func.lower(name)
|
||
).first()
|
||
if existing_company:
|
||
skipped.append({'name': name, 'reason': 'already_linked', 'company_id': existing_company.id})
|
||
continue
|
||
|
||
# Try to match existing company
|
||
company, confidence = match_company_by_name(db, name)
|
||
|
||
if company and confidence >= 0.5:
|
||
# Link existing company
|
||
link_existing_company(db, company, meeting_id, meeting.meeting_date)
|
||
matched.append({
|
||
'extracted_name': name,
|
||
'matched_id': company.id,
|
||
'matched_name': company.name,
|
||
'confidence': confidence
|
||
})
|
||
else:
|
||
# Create placeholder
|
||
new_company = create_placeholder_company(db, name, meeting_id, meeting.meeting_date)
|
||
created.append({
|
||
'name': new_company.name,
|
||
'id': new_company.id,
|
||
'slug': new_company.slug
|
||
})
|
||
|
||
db.flush()
|
||
|
||
# Send notifications
|
||
notif_count, email_count = notify_office_managers(db, meeting, {
|
||
'extracted': extracted,
|
||
'matched': matched,
|
||
'created': created,
|
||
'skipped': skipped
|
||
})
|
||
|
||
# Log the workflow run
|
||
if existing_log:
|
||
# Update existing log
|
||
existing_log.extracted_companies = [e for e in extracted]
|
||
existing_log.matched_companies = matched
|
||
existing_log.created_companies = created
|
||
existing_log.skipped = skipped
|
||
existing_log.notifications_sent = notif_count
|
||
existing_log.emails_sent = email_count
|
||
existing_log.executed_at = datetime.now()
|
||
existing_log.executed_by = executed_by_user_id
|
||
else:
|
||
log = AdmissionWorkflowLog(
|
||
meeting_id=meeting_id,
|
||
executed_by=executed_by_user_id,
|
||
extracted_companies=[e for e in extracted],
|
||
matched_companies=matched,
|
||
created_companies=created,
|
||
skipped=skipped,
|
||
notifications_sent=notif_count,
|
||
emails_sent=email_count,
|
||
status='completed'
|
||
)
|
||
db.add(log)
|
||
|
||
db.commit()
|
||
|
||
logger.info(
|
||
f"Admission workflow completed for meeting {meeting_id}: "
|
||
f"{len(extracted)} extracted, {len(matched)} matched, "
|
||
f"{len(created)} created, {len(skipped)} skipped"
|
||
)
|
||
|
||
return {
|
||
'status': 'completed',
|
||
'extracted': len(extracted),
|
||
'matched': len(matched),
|
||
'created': len(created),
|
||
'skipped': len(skipped)
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Admission workflow failed for meeting {meeting_id}: {e}", exc_info=True)
|
||
try:
|
||
db.rollback()
|
||
error_log = AdmissionWorkflowLog(
|
||
meeting_id=meeting_id,
|
||
executed_by=executed_by_user_id,
|
||
status='failed',
|
||
error_message=str(e)
|
||
)
|
||
db.add(error_log)
|
||
db.commit()
|
||
except Exception:
|
||
pass
|
||
return {'status': 'failed', 'error': str(e)}
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def extract_admitted_companies(proceedings: list) -> list:
|
||
"""
|
||
Parse proceedings JSONB to find admission decisions.
|
||
|
||
Looks for proceedings where:
|
||
- title matches "Prezentacja firmy X -- kandydat" pattern
|
||
- decisions contain "Przyjeta/Przyjety jednoglosnie" (not "przeniesiona")
|
||
"""
|
||
results = []
|
||
|
||
for i, proc in enumerate(proceedings):
|
||
title = proc.get('title', '')
|
||
decisions = proc.get('decisions', [])
|
||
if isinstance(decisions, str):
|
||
decisions = [decisions]
|
||
|
||
# Collect all admission decisions from this proceeding
|
||
admission_decisions = []
|
||
for d in decisions:
|
||
d_lower = d.lower()
|
||
if ('przyjęt' in d_lower and 'jednogłośnie' in d_lower
|
||
and 'przeniesion' not in d_lower
|
||
and 'program' not in d_lower
|
||
and 'protokół' not in d_lower
|
||
and 'protokol' not in d_lower):
|
||
admission_decisions.append(d)
|
||
|
||
if not admission_decisions:
|
||
continue
|
||
|
||
# Try to extract company name from title first
|
||
# Pattern 1: "Prezentacja firmy X — kandydat na członka Izby"
|
||
# Pattern 2: "Prezentacja: X – coach/mentoring (kandydatka na członka Izby)"
|
||
title_name = None
|
||
for pattern in [
|
||
r'[Pp]rezentacja\s+firmy\s+(.+?)\s*[—–\-]\s*kandydat',
|
||
r'[Pp]rezentacja:\s+(.+?)\s*[—–\-]\s*',
|
||
r'[Pp]rezentacja\s+firmy\s+(.+?)$',
|
||
]:
|
||
match = re.search(pattern, title)
|
||
if match:
|
||
title_name = match.group(1).strip().rstrip('.')
|
||
break
|
||
|
||
if title_name:
|
||
# Single company from title — use first admission decision
|
||
results.append({
|
||
'title': title,
|
||
'extracted_name': title_name,
|
||
'decision_text': admission_decisions[0],
|
||
'proceeding_index': i
|
||
})
|
||
else:
|
||
# Bulk admission — extract company names from each decision
|
||
# Pattern: "Przyjęto jednogłośnie firmę X jako nowego członka Izby"
|
||
for d in admission_decisions:
|
||
match = re.search(r'[Pp]rzyjęt[oa]\s+jednogłośnie\s+firmę\s+(.+?)\s+jako', d)
|
||
if match:
|
||
company_name = match.group(1).strip().rstrip('.')
|
||
results.append({
|
||
'title': title,
|
||
'extracted_name': company_name,
|
||
'decision_text': d,
|
||
'proceeding_index': i
|
||
})
|
||
|
||
return results
|
||
|
||
|
||
def match_company_by_name(db, name: str) -> tuple:
|
||
"""
|
||
Try to find existing company by name.
|
||
Returns (Company or None, confidence float).
|
||
"""
|
||
# 1. Exact case-insensitive match
|
||
exact = db.query(Company).filter(
|
||
func.lower(Company.name) == func.lower(name)
|
||
).first()
|
||
if exact:
|
||
return (exact, 1.0)
|
||
|
||
# 2. ILIKE contains match
|
||
ilike = db.query(Company).filter(
|
||
Company.name.ilike(f'%{name}%')
|
||
).first()
|
||
if ilike:
|
||
return (ilike, 0.8)
|
||
|
||
# 3. Reverse ILIKE (DB name contained in extracted name)
|
||
# e.g. extracted "Prospoland" matches DB "Pros Poland"
|
||
all_companies = db.query(Company.id, Company.name).all()
|
||
for c_id, c_name in all_companies:
|
||
if c_name and (c_name.lower() in name.lower() or name.lower() in c_name.lower()):
|
||
company = db.query(Company).filter_by(id=c_id).first()
|
||
return (company, 0.7)
|
||
|
||
# 4. pg_trgm similarity (if extension available)
|
||
try:
|
||
result = db.execute(
|
||
text("SELECT id, name, similarity(name, :name) as sim FROM companies WHERE similarity(name, :name) > 0.5 ORDER BY sim DESC LIMIT 1"),
|
||
{'name': name}
|
||
).first()
|
||
if result:
|
||
company = db.query(Company).filter_by(id=result[0]).first()
|
||
return (company, float(result[2]))
|
||
except Exception:
|
||
pass
|
||
|
||
return (None, 0.0)
|
||
|
||
|
||
def create_placeholder_company(db, name: str, meeting_id: int, meeting_date) -> Company:
|
||
"""Create a minimal placeholder company."""
|
||
import unicodedata
|
||
|
||
# Generate slug
|
||
slug = name.lower().strip()
|
||
slug = unicodedata.normalize('NFKD', slug).encode('ascii', 'ignore').decode('ascii')
|
||
slug = re.sub(r'[^a-z0-9]+', '-', slug).strip('-')
|
||
|
||
# Ensure unique
|
||
base_slug = slug
|
||
counter = 1
|
||
while db.query(Company).filter_by(slug=slug).first():
|
||
slug = f"{base_slug}-{counter}"
|
||
counter += 1
|
||
|
||
company = Company(
|
||
name=name,
|
||
slug=slug,
|
||
status='pending',
|
||
data_quality='basic',
|
||
admitted_at_meeting_id=meeting_id,
|
||
member_since=meeting_date,
|
||
)
|
||
db.add(company)
|
||
db.flush() # Get ID
|
||
|
||
logger.info(f"Created placeholder company: {name} (ID {company.id}, slug {slug})")
|
||
return company
|
||
|
||
|
||
def link_existing_company(db, company, meeting_id: int, meeting_date):
|
||
"""Link existing company to a board meeting admission."""
|
||
if not company.admitted_at_meeting_id:
|
||
company.admitted_at_meeting_id = meeting_id
|
||
if not company.member_since:
|
||
company.member_since = meeting_date
|
||
logger.info(f"Linked company {company.name} (ID {company.id}) to meeting {meeting_id}")
|
||
|
||
|
||
def notify_office_managers(db, meeting, results: dict) -> tuple:
|
||
"""Send in-app notifications and emails to Office Managers."""
|
||
notif_count = 0
|
||
email_count = 0
|
||
|
||
total = len(results.get('matched', [])) + len(results.get('created', []))
|
||
needs_attention = len(results.get('created', []))
|
||
|
||
if total == 0:
|
||
return (0, 0)
|
||
|
||
# Find Office Managers and Admins
|
||
managers = db.query(User).filter(
|
||
User.role.in_(['ADMIN', 'OFFICE_MANAGER']),
|
||
User.is_active == True # noqa: E712
|
||
).all()
|
||
|
||
meeting_id_str = f"{meeting.meeting_number}/{meeting.year}"
|
||
action_url = f"/rada/posiedzenia/{meeting.id}/przyjecia"
|
||
|
||
title = f"Rada {meeting_id_str} — nowi członkowie"
|
||
if needs_attention > 0:
|
||
message = f"Przyjęto {total} firm. {needs_attention} wymaga uzupełnienia profilu."
|
||
else:
|
||
message = f"Przyjęto {total} firm. Wszystkie profile są już uzupełnione."
|
||
|
||
for manager in managers:
|
||
try:
|
||
from utils.notifications import create_notification
|
||
create_notification(
|
||
user_id=manager.id,
|
||
title=title,
|
||
message=message,
|
||
notification_type='system',
|
||
related_type='board_meeting',
|
||
related_id=meeting.id,
|
||
action_url=action_url
|
||
)
|
||
notif_count += 1
|
||
except Exception as e:
|
||
logger.error(f"Failed to notify user {manager.id}: {e}")
|
||
|
||
# Send email to managers
|
||
try:
|
||
from email_service import send_email
|
||
|
||
# Build HTML table
|
||
rows = []
|
||
for m in results.get('matched', []):
|
||
rows.append(f"<tr><td>{m['matched_name']}</td><td style='color:green;'>✓ Profil istnieje</td></tr>")
|
||
for c in results.get('created', []):
|
||
rows.append(f"<tr><td>{c['name']}</td><td style='color:orange;'>⏳ Wymaga uzupełnienia</td></tr>")
|
||
|
||
table_html = f"""
|
||
<table style="width:100%; border-collapse:collapse; margin:16px 0;">
|
||
<thead><tr style="background:#f3f4f6;"><th style="text-align:left;padding:8px;">Firma</th><th style="text-align:left;padding:8px;">Status</th></tr></thead>
|
||
<tbody>{''.join(rows)}</tbody>
|
||
</table>
|
||
"""
|
||
|
||
body_html = f"""
|
||
<p>Na posiedzeniu Rady <strong>{meeting_id_str}</strong> ({meeting.meeting_date.strftime('%d.%m.%Y')})
|
||
przyjęto <strong>{total}</strong> nowych członków.</p>
|
||
{table_html}
|
||
{'<p><strong>Uwaga:</strong> ' + str(needs_attention) + ' firm wymaga uzupełnienia profilu na portalu.</p>' if needs_attention else ''}
|
||
<p><a href="https://nordabiznes.pl{action_url}" style="display:inline-block;padding:10px 20px;background:#2563eb;color:white;border-radius:8px;text-decoration:none;font-weight:600;">Przejdź do dashboardu przyjęć</a></p>
|
||
"""
|
||
|
||
body_text = f"Na posiedzeniu Rady {meeting_id_str} przyjęto {total} nowych członków. {needs_attention} wymaga uzupełnienia profilu."
|
||
|
||
# TEST MODE: send only to maciej.pienczyn@inpi.pl during testing
|
||
test_email = 'maciej.pienczyn@inpi.pl'
|
||
try:
|
||
send_email(
|
||
to=test_email,
|
||
subject=f"[NordaBiz] Rada {meeting_id_str} — przyjęto {total} nowych członków",
|
||
body_text=body_text,
|
||
body_html=body_html,
|
||
email_type='system',
|
||
recipient_name='Maciej Pienczyn'
|
||
)
|
||
email_count = 1
|
||
except Exception as e:
|
||
logger.error(f"Failed to send test email: {e}")
|
||
|
||
# TODO: Po zakończeniu testów — odkomentować pętlę poniżej i usunąć test_email
|
||
# for manager in managers:
|
||
# if manager.email:
|
||
# try:
|
||
# send_email(
|
||
# to=manager.email,
|
||
# subject=f"[NordaBiz] Rada {meeting_id_str} — przyjęto {total} nowych członków",
|
||
# body_text=body_text,
|
||
# body_html=body_html,
|
||
# email_type='system',
|
||
# recipient_name=manager.name
|
||
# )
|
||
# email_count += 1
|
||
# except Exception as e:
|
||
# logger.error(f"Failed to email {manager.email}: {e}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to send admission emails: {e}")
|
||
|
||
return (notif_count, email_count)
|