Some checks are pending
NordaBiz Tests / Unit & Integration Tests (push) Waiting to run
NordaBiz Tests / E2E Tests (Playwright) (push) Blocked by required conditions
NordaBiz Tests / Smoke Tests (Production) (push) Blocked by required conditions
NordaBiz Tests / Send Failure Notification (push) Blocked by required conditions
- Extract 12-field completeness scoring to utils/data_quality.py service - Auto-update data_quality_score and data_quality label on company data changes - Add /admin/data-quality dashboard with field coverage stats, quality distribution, and sortable company table - Add bulk enrichment with background processing, step selection, and progress tracking - Flow GBP phone/website to Company record when company fields are empty - Display Google opening hours on public company profile - Add BulkEnrichmentJob model and migration 075 - Refactor arm_company.py to support selective steps and progress callbacks Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
194 lines
6.2 KiB
Python
194 lines
6.2 KiB
Python
"""
|
|
Admin Bulk Enrichment Routes
|
|
=============================
|
|
|
|
Batch enrichment operations for multiple companies at once.
|
|
"""
|
|
|
|
import logging
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
|
|
from flask import request, jsonify
|
|
from flask_login import login_required, current_user
|
|
|
|
from . import bp
|
|
from database import SessionLocal, Company, BulkEnrichmentJob, SystemRole
|
|
from utils.decorators import role_required
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _run_bulk_enrichment(job_id, company_ids, steps):
|
|
"""Background worker for bulk enrichment. Runs in a separate thread."""
|
|
import sys
|
|
import os
|
|
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
if base_dir not in sys.path:
|
|
sys.path.insert(0, base_dir)
|
|
scripts_dir = os.path.join(base_dir, 'scripts')
|
|
if scripts_dir not in sys.path:
|
|
sys.path.insert(0, scripts_dir)
|
|
|
|
from scripts.arm_company import arm_company
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
job = db.query(BulkEnrichmentJob).filter_by(id=job_id).first()
|
|
if not job:
|
|
logger.error(f"Bulk enrichment job {job_id} not found")
|
|
return
|
|
|
|
all_results = {}
|
|
|
|
for i, cid in enumerate(company_ids):
|
|
company = db.query(Company).filter_by(id=cid).first()
|
|
company_name = company.name if company else f"ID {cid}"
|
|
|
|
try:
|
|
result = arm_company(cid, force=False, steps=steps)
|
|
if isinstance(result, dict):
|
|
all_results[str(cid)] = {
|
|
'name': company_name,
|
|
'results': result,
|
|
}
|
|
else:
|
|
all_results[str(cid)] = {
|
|
'name': company_name,
|
|
'results': {'error': 'Firma nie znaleziona' if not result else 'Nieznany błąd'},
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Bulk enrichment error for company {cid}: {e}")
|
|
all_results[str(cid)] = {
|
|
'name': company_name,
|
|
'results': {'error': str(e)[:200]},
|
|
}
|
|
|
|
# Update progress
|
|
job.processed_companies = i + 1
|
|
job.results = all_results
|
|
db.commit()
|
|
|
|
# Delay between companies to respect API limits
|
|
if i < len(company_ids) - 1:
|
|
time.sleep(2)
|
|
|
|
job.status = 'completed'
|
|
job.completed_at = datetime.now()
|
|
db.commit()
|
|
logger.info(f"Bulk enrichment job {job_id} completed: {len(company_ids)} companies")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Bulk enrichment job {job_id} failed: {e}")
|
|
try:
|
|
job = db.query(BulkEnrichmentJob).filter_by(id=job_id).first()
|
|
if job:
|
|
job.status = 'failed'
|
|
job.completed_at = datetime.now()
|
|
db.commit()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/data-quality/bulk-enrich', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.ADMIN)
|
|
def bulk_enrich():
|
|
"""Start a bulk enrichment job."""
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'error': 'Brak danych'}), 400
|
|
|
|
company_ids = data.get('company_ids', [])
|
|
steps = data.get('steps', ['registry', 'seo', 'social', 'gbp', 'logo'])
|
|
|
|
if not company_ids:
|
|
return jsonify({'error': 'Nie wybrano firm'}), 400
|
|
|
|
if len(company_ids) > 50:
|
|
return jsonify({'error': 'Maksymalnie 50 firm na raz'}), 400
|
|
|
|
valid_steps = {'registry', 'seo', 'social', 'gbp', 'logo'}
|
|
steps = [s for s in steps if s in valid_steps]
|
|
if not steps:
|
|
return jsonify({'error': 'Nie wybrano kroków'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Validate company IDs
|
|
existing = db.query(Company.id).filter(Company.id.in_(company_ids)).all()
|
|
existing_ids = [r[0] for r in existing]
|
|
if len(existing_ids) != len(company_ids):
|
|
missing = set(company_ids) - set(existing_ids)
|
|
return jsonify({'error': f'Nie znaleziono firm: {missing}'}), 400
|
|
|
|
# Create job
|
|
job = BulkEnrichmentJob(
|
|
started_by=current_user.id,
|
|
total_companies=len(company_ids),
|
|
steps=steps,
|
|
results={},
|
|
)
|
|
db.add(job)
|
|
db.commit()
|
|
|
|
job_id = job.id
|
|
|
|
# Start background thread
|
|
thread = threading.Thread(
|
|
target=_run_bulk_enrichment,
|
|
args=(job_id, company_ids, steps),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
logger.info(f"Bulk enrichment job {job_id} started by {current_user.email}: {len(company_ids)} companies, steps={steps}")
|
|
|
|
return jsonify({'job_id': job_id, 'total': len(company_ids)})
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/data-quality/bulk-enrich/status')
|
|
@login_required
|
|
@role_required(SystemRole.ADMIN)
|
|
def bulk_enrich_status():
|
|
"""Check status of a bulk enrichment job."""
|
|
job_id = request.args.get('job_id', type=int)
|
|
if not job_id:
|
|
return jsonify({'error': 'Brak job_id'}), 400
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
job = db.query(BulkEnrichmentJob).filter_by(id=job_id).first()
|
|
if not job:
|
|
return jsonify({'error': 'Job nie znaleziony'}), 404
|
|
|
|
# Get latest result for progress log
|
|
latest_result = None
|
|
if job.results and job.processed_companies > 0:
|
|
results_dict = job.results
|
|
# Find the last processed company
|
|
for cid, data in results_dict.items():
|
|
name = data.get('name', cid)
|
|
res = data.get('results', {})
|
|
ok = sum(1 for v in res.values() if isinstance(v, str) and (v.startswith('OK') or v.startswith('SKIP (done)')))
|
|
total_steps = len(res)
|
|
latest_result = f"{name}: {ok}/{total_steps} kroków OK"
|
|
|
|
return jsonify({
|
|
'job_id': job.id,
|
|
'status': job.status,
|
|
'processed': job.processed_companies,
|
|
'total': job.total_companies,
|
|
'latest_result': latest_result,
|
|
'results': job.results if job.status != 'running' else None,
|
|
})
|
|
|
|
finally:
|
|
db.close()
|