fix: use file-based shared state for enrichment across gunicorn workers
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions

Replace all _enrichment_status in-memory dict references with
file-based shared state (/tmp/nordabiz_enrichment_state.json).
Fixes enrichment appearing instantly complete in multi-worker env.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Maciej Pienczyn 2026-03-12 09:00:05 +01:00
parent cd61c51f91
commit d338e1aad6

View File

@ -761,20 +761,60 @@ _FIELD_LABELS = {
'profile_completeness_score': 'Kompletność profilu',
}
# In-memory staging area — data collected but NOT yet saved to DB
_enrichment_status = {
# File-based shared state for enrichment jobs (works with multi-worker gunicorn)
import json
import tempfile
import os
import fcntl
_ENRICHMENT_STATE_FILE = os.path.join(tempfile.gettempdir(), 'nordabiz_enrichment_state.json')
_ENRICHMENT_DEFAULT = {
'running': False,
'progress': 0,
'total': 0,
'completed': 0,
'errors': 0,
'last_run': None,
'results': [], # Per-company results with before/after diffs
'pending_changes': [], # Changes awaiting approval
'results': [],
'pending_changes': [],
'approved': False,
}
def _read_enrichment_state():
"""Read enrichment state from shared file."""
try:
with open(_ENRICHMENT_STATE_FILE, 'r') as f:
fcntl.flock(f, fcntl.LOCK_SH)
data = json.load(f)
fcntl.flock(f, fcntl.LOCK_UN)
return data
except (FileNotFoundError, json.JSONDecodeError, IOError):
return dict(_ENRICHMENT_DEFAULT)
def _write_enrichment_state(state):
"""Write enrichment state to shared file (atomic)."""
try:
tmp_path = _ENRICHMENT_STATE_FILE + '.tmp'
with open(tmp_path, 'w') as f:
fcntl.flock(f, fcntl.LOCK_EX)
json.dump(state, f, default=str)
fcntl.flock(f, fcntl.LOCK_UN)
os.replace(tmp_path, _ENRICHMENT_STATE_FILE)
except IOError as e:
logger.error(f"Failed to write enrichment state: {e}")
def _update_enrichment_state(**kwargs):
"""Read-modify-write enrichment state."""
state = _read_enrichment_state()
state.update(kwargs)
_write_enrichment_state(state)
return state
def _format_value(key, val):
"""Format a field value for display."""
if val is None:
@ -797,26 +837,30 @@ def _format_value(key, val):
def _run_enrichment_background(company_ids):
"""Collect enrichment data into staging area (NO database writes)."""
"""Collect enrichment data into staging area (NO database writes).
All state is persisted to shared file for multi-worker gunicorn compatibility.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / 'scripts'))
try:
from social_media_audit import SocialProfileEnricher
except ImportError:
logger.error("Could not import SocialProfileEnricher")
_enrichment_status['running'] = False
except ImportError as e:
logger.error(f"Could not import SocialProfileEnricher: {e}")
_update_enrichment_state(running=False, errors=1)
return
enricher = SocialProfileEnricher()
db = SessionLocal()
_enrichment_status['total'] = len(company_ids)
_enrichment_status['completed'] = 0
_enrichment_status['errors'] = 0
_enrichment_status['results'] = []
_enrichment_status['pending_changes'] = []
_enrichment_status['approved'] = False
total = len(company_ids)
completed = 0
errors = 0
results = []
pending_changes = []
_update_enrichment_state(total=total, completed=0, errors=0,
results=[], pending_changes=[], approved=False)
tracked_fields = list(_FIELD_LABELS.keys())
@ -825,6 +869,7 @@ def _run_enrichment_background(company_ids):
try:
company = db.query(Company).filter_by(id=company_id).first()
if not company:
completed += 1
continue
profiles = db.query(CompanySocialMedia).filter(
@ -847,7 +892,6 @@ def _run_enrichment_background(company_ids):
'source': profile.source,
}
# Skip API-sourced profiles
if profile.source in ('facebook_api',):
profile_result['status'] = 'skipped'
profile_result['reason'] = 'Dane z API (wyższy priorytet)'
@ -857,14 +901,12 @@ def _run_enrichment_background(company_ids):
try:
enriched = enricher.enrich_profile(profile.platform, profile.url)
if enriched:
# Build before/after diff
changes = []
for field in tracked_fields:
new_val = enriched.get(field)
if new_val is None:
continue
old_val = getattr(profile, field, None)
# Only record actual changes
if old_val != new_val:
changes.append({
'field': field,
@ -885,7 +927,7 @@ def _run_enrichment_background(company_ids):
if changes:
company_result['has_changes'] = True
_enrichment_status['pending_changes'].append({
pending_changes.append({
'profile_id': profile.id,
'company_id': company_id,
'company_name': company.name,
@ -901,27 +943,50 @@ def _run_enrichment_background(company_ids):
logger.warning(f"Enrichment failed for {company.name}/{profile.platform}: {e}")
profile_result['status'] = 'error'
profile_result['reason'] = str(e)[:150]
_enrichment_status['errors'] += 1
errors += 1
company_result['profiles'].append(profile_result)
_enrichment_status['results'].append(company_result)
results.append(company_result)
except Exception as e:
logger.error(f"Enrichment error for company {company_id}: {e}")
_enrichment_status['errors'] += 1
errors += 1
_enrichment_status['completed'] += 1
_enrichment_status['progress'] = round(
_enrichment_status['completed'] / _enrichment_status['total'] * 100
)
completed += 1
progress = round(completed / total * 100) if total > 0 else 0
# Write state to file after each company (visible to all workers)
_write_enrichment_state({
'running': True,
'total': total,
'completed': completed,
'progress': progress,
'errors': errors,
'results': results,
'pending_changes': pending_changes,
'approved': False,
'last_run': None,
})
except Exception as e:
logger.error(f"Enrichment background thread crashed: {e}")
errors += 1
finally:
db.close()
_enrichment_status['running'] = False
_enrichment_status['last_run'] = datetime.now()
total_changes = len(_enrichment_status['pending_changes'])
logger.info(f"Enrichment scan completed: {_enrichment_status['completed']}/{_enrichment_status['total']}, "
f"{total_changes} pending changes, {_enrichment_status['errors']} errors")
_write_enrichment_state({
'running': False,
'total': total,
'completed': completed,
'progress': 100,
'errors': errors,
'results': results,
'pending_changes': pending_changes,
'approved': False,
'last_run': datetime.now().strftime('%d.%m.%Y %H:%M'),
})
logger.info(f"Enrichment scan completed: {completed}/{total}, "
f"{len(pending_changes)} pending changes, {errors} errors")
@bp.route('/social-audit/run-enrichment', methods=['POST'])
@ -932,12 +997,13 @@ def admin_social_audit_run_enrichment():
if not is_audit_owner():
return jsonify({'error': 'Brak uprawnień'}), 403
if _enrichment_status['running']:
state = _read_enrichment_state()
if state.get('running'):
return jsonify({
'error': 'Audyt już działa',
'progress': _enrichment_status['progress'],
'completed': _enrichment_status['completed'],
'total': _enrichment_status['total'],
'progress': state.get('progress', 0),
'completed': state.get('completed', 0),
'total': state.get('total', 0),
}), 409
company_ids_param = request.form.get('company_ids', '')
@ -955,11 +1021,18 @@ def admin_social_audit_run_enrichment():
if not company_ids:
return jsonify({'error': 'Brak firm do audytu'}), 400
_enrichment_status['running'] = True
_enrichment_status['progress'] = 0
_enrichment_status['results'] = []
_enrichment_status['pending_changes'] = []
_enrichment_status['approved'] = False
# Initialize state in shared file before starting thread
_write_enrichment_state({
'running': True,
'progress': 0,
'total': len(company_ids),
'completed': 0,
'errors': 0,
'results': [],
'pending_changes': [],
'approved': False,
'last_run': None,
})
thread = threading.Thread(
target=_run_enrichment_background,
@ -980,8 +1053,9 @@ def admin_social_audit_run_enrichment():
@role_required(SystemRole.OFFICE_MANAGER)
def admin_social_audit_enrichment_status():
"""Get current enrichment job status with live results feed."""
pending = _enrichment_status.get('pending_changes', [])
results = _enrichment_status.get('results', [])
state = _read_enrichment_state()
pending = state.get('pending_changes', [])
results = state.get('results', [])
# Return last N results for live feed (since_index param for incremental updates)
since = request.args.get('since', 0, type=int)
@ -1020,15 +1094,17 @@ def admin_social_audit_enrichment_status():
'profiles': profiles_summary,
})
last_run = state.get('last_run')
return jsonify({
'running': _enrichment_status['running'],
'progress': _enrichment_status['progress'],
'completed': _enrichment_status['completed'],
'total': _enrichment_status['total'],
'errors': _enrichment_status['errors'],
'last_run': _enrichment_status['last_run'].strftime('%d.%m.%Y %H:%M') if _enrichment_status['last_run'] else None,
'running': state.get('running', False),
'progress': state.get('progress', 0),
'completed': state.get('completed', 0),
'total': state.get('total', 0),
'errors': state.get('errors', 0),
'last_run': last_run,
'pending_count': len(pending),
'approved': _enrichment_status.get('approved', False),
'approved': state.get('approved', False),
'feed': feed,
'results_count': len(results),
})
@ -1043,30 +1119,32 @@ def admin_social_audit_enrichment_review():
from flask import abort
abort(404)
if _enrichment_status['running']:
state = _read_enrichment_state()
if state.get('running'):
flash('Audyt wciąż trwa. Poczekaj na zakończenie.', 'warning')
return redirect(url_for('admin.admin_social_audit'))
results = _enrichment_status.get('results', [])
pending = _enrichment_status.get('pending_changes', [])
results = state.get('results', [])
pending = state.get('pending_changes', [])
# Summary stats
total_profiles_scanned = sum(len(r['profiles']) for r in results)
total_profiles_scanned = sum(len(r.get('profiles', [])) for r in results)
profiles_with_changes = len(pending)
profiles_skipped = sum(1 for r in results for p in r['profiles'] if p.get('status') == 'skipped')
profiles_no_data = sum(1 for r in results for p in r['profiles'] if p.get('status') in ('no_data', 'no_changes'))
profiles_errors = sum(1 for r in results for p in r['profiles'] if p.get('status') == 'error')
profiles_skipped = sum(1 for r in results for p in r.get('profiles', []) if p.get('status') == 'skipped')
profiles_no_data = sum(1 for r in results for p in r.get('profiles', []) if p.get('status') in ('no_data', 'no_changes'))
profiles_errors = sum(1 for r in results for p in r.get('profiles', []) if p.get('status') == 'error')
companies_with_changes = len(set(c['company_id'] for c in pending))
summary = {
'total_companies': _enrichment_status.get('total', 0),
'total_companies': state.get('total', 0),
'total_profiles_scanned': total_profiles_scanned,
'profiles_with_changes': profiles_with_changes,
'profiles_skipped': profiles_skipped,
'profiles_no_data': profiles_no_data,
'profiles_errors': profiles_errors,
'companies_with_changes': companies_with_changes,
'last_run': _enrichment_status.get('last_run'),
'last_run': state.get('last_run'),
}
# Only show companies that have changes or errors
@ -1077,7 +1155,7 @@ def admin_social_audit_enrichment_review():
results=results_to_show,
all_results=results,
summary=summary,
approved=_enrichment_status.get('approved', False),
approved=state.get('approved', False),
)
@ -1089,14 +1167,16 @@ def admin_social_audit_enrichment_approve():
if not is_audit_owner():
return jsonify({'error': 'Brak uprawnień'}), 403
if _enrichment_status['running']:
state = _read_enrichment_state()
if state.get('running'):
return jsonify({'error': 'Audyt wciąż trwa'}), 409
pending = _enrichment_status.get('pending_changes', [])
pending = state.get('pending_changes', [])
if not pending:
return jsonify({'error': 'Brak oczekujących zmian do zatwierdzenia'}), 400
if _enrichment_status.get('approved'):
if state.get('approved'):
return jsonify({'error': 'Zmiany zostały już zatwierdzone'}), 409
db = SessionLocal()
@ -1130,7 +1210,7 @@ def admin_social_audit_enrichment_approve():
errors += 1
db.commit()
_enrichment_status['approved'] = True
_update_enrichment_state(approved=True)
logger.info(f"Enrichment approved: {applied} profiles updated, {errors} errors")
flash(f'Zatwierdzone: zaktualizowano {applied} profili.', 'success')
@ -1157,10 +1237,9 @@ def admin_social_audit_enrichment_discard():
if not is_audit_owner():
return jsonify({'error': 'Brak uprawnień'}), 403
count = len(_enrichment_status.get('pending_changes', []))
_enrichment_status['pending_changes'] = []
_enrichment_status['results'] = []
_enrichment_status['approved'] = False
state = _read_enrichment_state()
count = len(state.get('pending_changes', []))
_update_enrichment_state(pending_changes=[], results=[], approved=False)
flash(f'Odrzucono {count} oczekujących zmian. Baza danych nie została zmieniona.', 'info')
return jsonify({