nordabiz/blueprints/admin/routes_bulk_enrichment.py
Maciej Pienczyn 93e90b2c72
Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
feat: add data quality dashboard, auto-scoring, bulk enrichment and GBP data flow
- Extract 12-field completeness scoring to utils/data_quality.py service
- Auto-update data_quality_score and data_quality label on company data changes
- Add /admin/data-quality dashboard with field coverage stats, quality distribution, and sortable company table
- Add bulk enrichment with background processing, step selection, and progress tracking
- Flow GBP phone/website to Company record when company fields are empty
- Display Google opening hours on public company profile
- Add BulkEnrichmentJob model and migration 075
- Refactor arm_company.py to support selective steps and progress callbacks

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 07:02:45 +01:00

194 lines
6.2 KiB
Python

"""
Admin Bulk Enrichment Routes
=============================
Batch enrichment operations for multiple companies at once.
"""
import logging
import threading
import time
from datetime import datetime
from flask import request, jsonify
from flask_login import login_required, current_user
from . import bp
from database import SessionLocal, Company, BulkEnrichmentJob, SystemRole
from utils.decorators import role_required
logger = logging.getLogger(__name__)
def _run_bulk_enrichment(job_id, company_ids, steps):
"""Background worker for bulk enrichment. Runs in a separate thread."""
import sys
import os
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if base_dir not in sys.path:
sys.path.insert(0, base_dir)
scripts_dir = os.path.join(base_dir, 'scripts')
if scripts_dir not in sys.path:
sys.path.insert(0, scripts_dir)
from scripts.arm_company import arm_company
db = SessionLocal()
try:
job = db.query(BulkEnrichmentJob).filter_by(id=job_id).first()
if not job:
logger.error(f"Bulk enrichment job {job_id} not found")
return
all_results = {}
for i, cid in enumerate(company_ids):
company = db.query(Company).filter_by(id=cid).first()
company_name = company.name if company else f"ID {cid}"
try:
result = arm_company(cid, force=False, steps=steps)
if isinstance(result, dict):
all_results[str(cid)] = {
'name': company_name,
'results': result,
}
else:
all_results[str(cid)] = {
'name': company_name,
'results': {'error': 'Firma nie znaleziona' if not result else 'Nieznany błąd'},
}
except Exception as e:
logger.error(f"Bulk enrichment error for company {cid}: {e}")
all_results[str(cid)] = {
'name': company_name,
'results': {'error': str(e)[:200]},
}
# Update progress
job.processed_companies = i + 1
job.results = all_results
db.commit()
# Delay between companies to respect API limits
if i < len(company_ids) - 1:
time.sleep(2)
job.status = 'completed'
job.completed_at = datetime.now()
db.commit()
logger.info(f"Bulk enrichment job {job_id} completed: {len(company_ids)} companies")
except Exception as e:
logger.error(f"Bulk enrichment job {job_id} failed: {e}")
try:
job = db.query(BulkEnrichmentJob).filter_by(id=job_id).first()
if job:
job.status = 'failed'
job.completed_at = datetime.now()
db.commit()
except Exception:
pass
finally:
db.close()
@bp.route('/data-quality/bulk-enrich', methods=['POST'])
@login_required
@role_required(SystemRole.ADMIN)
def bulk_enrich():
"""Start a bulk enrichment job."""
data = request.get_json()
if not data:
return jsonify({'error': 'Brak danych'}), 400
company_ids = data.get('company_ids', [])
steps = data.get('steps', ['registry', 'seo', 'social', 'gbp', 'logo'])
if not company_ids:
return jsonify({'error': 'Nie wybrano firm'}), 400
if len(company_ids) > 50:
return jsonify({'error': 'Maksymalnie 50 firm na raz'}), 400
valid_steps = {'registry', 'seo', 'social', 'gbp', 'logo'}
steps = [s for s in steps if s in valid_steps]
if not steps:
return jsonify({'error': 'Nie wybrano kroków'}), 400
db = SessionLocal()
try:
# Validate company IDs
existing = db.query(Company.id).filter(Company.id.in_(company_ids)).all()
existing_ids = [r[0] for r in existing]
if len(existing_ids) != len(company_ids):
missing = set(company_ids) - set(existing_ids)
return jsonify({'error': f'Nie znaleziono firm: {missing}'}), 400
# Create job
job = BulkEnrichmentJob(
started_by=current_user.id,
total_companies=len(company_ids),
steps=steps,
results={},
)
db.add(job)
db.commit()
job_id = job.id
# Start background thread
thread = threading.Thread(
target=_run_bulk_enrichment,
args=(job_id, company_ids, steps),
daemon=True,
)
thread.start()
logger.info(f"Bulk enrichment job {job_id} started by {current_user.email}: {len(company_ids)} companies, steps={steps}")
return jsonify({'job_id': job_id, 'total': len(company_ids)})
finally:
db.close()
@bp.route('/data-quality/bulk-enrich/status')
@login_required
@role_required(SystemRole.ADMIN)
def bulk_enrich_status():
"""Check status of a bulk enrichment job."""
job_id = request.args.get('job_id', type=int)
if not job_id:
return jsonify({'error': 'Brak job_id'}), 400
db = SessionLocal()
try:
job = db.query(BulkEnrichmentJob).filter_by(id=job_id).first()
if not job:
return jsonify({'error': 'Job nie znaleziony'}), 404
# Get latest result for progress log
latest_result = None
if job.results and job.processed_companies > 0:
results_dict = job.results
# Find the last processed company
for cid, data in results_dict.items():
name = data.get('name', cid)
res = data.get('results', {})
ok = sum(1 for v in res.values() if isinstance(v, str) and (v.startswith('OK') or v.startswith('SKIP (done)')))
total_steps = len(res)
latest_result = f"{name}: {ok}/{total_steps} kroków OK"
return jsonify({
'job_id': job.id,
'status': job.status,
'processed': job.processed_companies,
'total': job.total_companies,
'latest_result': latest_result,
'results': job.results if job.status != 'running' else None,
})
finally:
db.close()