feat(zopk): Skrypt pipeline do automatycznej ekstrakcji wiedzy
Uruchamia po kolei: scraping treści, ekstrakcję AI, generowanie embeddingów. Do użycia w cron job co godzinę. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
081c0d7ec5
commit
cbd9c5cc4d
148
scripts/zopk_knowledge_pipeline.py
Executable file
148
scripts/zopk_knowledge_pipeline.py
Executable file
@ -0,0 +1,148 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ZOPK Knowledge Pipeline - Automatyczny pipeline ekstrakcji wiedzy.
|
||||
|
||||
Uruchamia po kolei:
|
||||
1. Scraping treści artykułów (Google News URL decode + fetch)
|
||||
2. Ekstrakcja AI (chunks, fakty, encje)
|
||||
3. Generowanie embeddingów (pgvector)
|
||||
|
||||
Usage:
|
||||
python scripts/zopk_knowledge_pipeline.py
|
||||
|
||||
Cron (co godzinę):
|
||||
0 * * * * cd /var/www/nordabiznes && /var/www/nordabiznes/venv/bin/python3 scripts/zopk_knowledge_pipeline.py >> /var/log/nordabiznes/knowledge_pipeline.log 2>&1
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
# Add parent directory to path for imports
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s [%(levelname)s] %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_db_session():
|
||||
"""Create database session."""
|
||||
DATABASE_URL = os.getenv('DATABASE_URL')
|
||||
if not DATABASE_URL:
|
||||
raise ValueError("DATABASE_URL not set in environment")
|
||||
|
||||
engine = create_engine(DATABASE_URL)
|
||||
Session = sessionmaker(bind=engine)
|
||||
return Session()
|
||||
|
||||
|
||||
def run_scraping(db, limit: int = 50) -> dict:
|
||||
"""Step 1: Scrape article content."""
|
||||
logger.info("=" * 60)
|
||||
logger.info("STEP 1: Scraping article content")
|
||||
logger.info("=" * 60)
|
||||
|
||||
from zopk_content_scraper import ZOPKContentScraper
|
||||
|
||||
scraper = ZOPKContentScraper(db)
|
||||
stats = scraper.batch_scrape(limit=limit)
|
||||
|
||||
logger.info(f"Scraping complete: {stats['scraped']} scraped, {stats['failed']} failed, {stats['skipped']} skipped")
|
||||
return stats
|
||||
|
||||
|
||||
def run_extraction(db, limit: int = 50) -> dict:
|
||||
"""Step 2: Extract knowledge with AI."""
|
||||
logger.info("=" * 60)
|
||||
logger.info("STEP 2: AI Knowledge Extraction")
|
||||
logger.info("=" * 60)
|
||||
|
||||
from zopk_knowledge_service import ZOPKKnowledgeService
|
||||
|
||||
service = ZOPKKnowledgeService(db)
|
||||
stats = service.batch_extract(limit=limit)
|
||||
|
||||
logger.info(f"Extraction complete: {stats['processed']} processed, {stats['failed']} failed")
|
||||
return stats
|
||||
|
||||
|
||||
def run_embeddings(db, limit: int = 100) -> dict:
|
||||
"""Step 3: Generate embeddings."""
|
||||
logger.info("=" * 60)
|
||||
logger.info("STEP 3: Generating Embeddings")
|
||||
logger.info("=" * 60)
|
||||
|
||||
from zopk_knowledge_service import generate_chunk_embeddings
|
||||
|
||||
stats = generate_chunk_embeddings(db, limit=limit)
|
||||
|
||||
logger.info(f"Embeddings complete: {stats['generated']} generated, {stats['failed']} failed")
|
||||
return stats
|
||||
|
||||
|
||||
def main():
|
||||
"""Run the full knowledge pipeline."""
|
||||
start_time = datetime.now()
|
||||
logger.info("=" * 60)
|
||||
logger.info("ZOPK KNOWLEDGE PIPELINE STARTED")
|
||||
logger.info(f"Time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
logger.info("=" * 60)
|
||||
|
||||
try:
|
||||
db = get_db_session()
|
||||
|
||||
# Step 1: Scraping
|
||||
scrape_stats = run_scraping(db, limit=50)
|
||||
|
||||
# Step 2: AI Extraction (only if we have scraped content)
|
||||
if scrape_stats['scraped'] > 0 or True: # Always try - there might be pending articles
|
||||
extract_stats = run_extraction(db, limit=50)
|
||||
else:
|
||||
extract_stats = {'processed': 0, 'failed': 0}
|
||||
logger.info("Skipping extraction - no new scraped content")
|
||||
|
||||
# Step 3: Embeddings (only if we have new chunks)
|
||||
if extract_stats.get('chunks_created', 0) > 0 or True: # Always try
|
||||
embed_stats = run_embeddings(db, limit=100)
|
||||
else:
|
||||
embed_stats = {'generated': 0, 'failed': 0}
|
||||
logger.info("Skipping embeddings - no new chunks")
|
||||
|
||||
db.close()
|
||||
|
||||
# Summary
|
||||
end_time = datetime.now()
|
||||
duration = (end_time - start_time).total_seconds()
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("PIPELINE SUMMARY")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Scraping: {scrape_stats['scraped']} success, {scrape_stats['failed']} failed")
|
||||
logger.info(f"Extraction: {extract_stats.get('processed', 0)} success, {extract_stats.get('failed', 0)} failed")
|
||||
logger.info(f"Embeddings: {embed_stats.get('generated', 0)} success, {embed_stats.get('failed', 0)} failed")
|
||||
logger.info(f"Duration: {duration:.1f} seconds")
|
||||
logger.info("=" * 60)
|
||||
logger.info("PIPELINE COMPLETED SUCCESSFULLY")
|
||||
logger.info("=" * 60)
|
||||
|
||||
return 0
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Pipeline failed with error: {e}", exc_info=True)
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
||||
Loading…
Reference in New Issue
Block a user