nordabiz/search_service.py
2026-01-01 14:01:49 +01:00

418 lines
15 KiB
Python

"""
Search Service for Norda Biznes Hub
Unified search functionality with:
- NIP/REGON direct lookup
- Synonym expansion
- SQLite scoring fallback
- PostgreSQL FTS with fuzzy matching (when available)
"""
import re
from dataclasses import dataclass
from typing import List, Optional, Tuple
from sqlalchemy import text
from sqlalchemy.orm import Session
from database import Company, Service, Competency, CompanyService, CompanyCompetency
@dataclass
class SearchResult:
"""Search result with score and match info"""
company: Company
score: float
match_type: str # 'nip', 'regon', 'exact', 'fuzzy', 'keyword'
# Synonym dictionary - reused from nordabiz_chat.py
KEYWORD_SYNONYMS = {
# IT / Web
'strony': ['www', 'web', 'internet', 'witryny', 'seo', 'e-commerce', 'ecommerce', 'sklep', 'portal'],
'internetowe': ['www', 'web', 'online', 'cyfrowe', 'seo', 'marketing'],
'aplikacje': ['software', 'programowanie', 'systemy', 'crm', 'erp', 'app'],
'it': ['informatyka', 'komputery', 'software', 'systemy', 'serwis'],
'programowanie': ['software', 'kod', 'developer', 'aplikacje'],
# Budownictwo
'budowa': ['budownictwo', 'konstrukcje', 'remonty', 'wykończenia', 'dach', 'elewacja'],
'dom': ['budynek', 'mieszkanie', 'nieruchomości', 'budownictwo'],
'remont': ['wykończenie', 'naprawa', 'renowacja', 'modernizacja'],
'hala': ['magazyn', 'produkcja', 'przemysłowa', 'stalowa', 'konstrukcja'],
# Transport / Logistyka
'transport': ['przewóz', 'logistyka', 'spedycja', 'dostawa', 'kurier'],
'samochód': ['auto', 'pojazd', 'motoryzacja', 'serwis', 'naprawa'],
# Usługi
'księgowość': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe', 'kadry'],
'księgowa': ['rachunkowość', 'finanse', 'podatki', 'biuro rachunkowe'],
'prawo': ['prawnik', 'adwokat', 'radca', 'kancelaria', 'notariusz'],
'prawnik': ['prawo', 'adwokat', 'radca', 'kancelaria', 'prawny'],
'marketing': ['reklama', 'promocja', 'seo', 'social media', 'branding'],
# Produkcja
'produkcja': ['wytwarzanie', 'fabryka', 'zakład', 'przemysł'],
'metal': ['stal', 'obróbka', 'spawanie', 'cnc', 'ślusarstwo'],
'drewno': ['stolarka', 'meble', 'tartak', 'carpentry'],
'aluminium': ['alu', 'lekkie', 'profile', 'spawanie'],
}
class SearchService:
"""Unified search service for companies"""
def __init__(self, db: Session):
self.db = db
self._is_postgres = self._detect_postgres()
def _detect_postgres(self) -> bool:
"""Check if we're using PostgreSQL"""
try:
dialect = self.db.bind.dialect.name
return dialect == 'postgresql'
except:
return False
def search(
self,
query: str,
category_id: Optional[int] = None,
limit: int = 50
) -> List[SearchResult]:
"""
Main search method
Args:
query: Search query string
category_id: Optional category filter
limit: Max results to return
Returns:
List of SearchResult objects sorted by relevance
"""
query = query.strip()
if not query:
return self._get_all_companies(category_id, limit)
# 1. Check for NIP (10 digits)
if self._is_nip(query):
result = self._search_by_nip(query)
if result:
return [result]
# 2. Check for REGON (9 or 14 digits)
if self._is_regon(query):
result = self._search_by_regon(query)
if result:
return [result]
# 3. Full-text search with synonyms
if self._is_postgres:
return self._search_postgres_fts(query, category_id, limit)
else:
return self._search_sqlite_fallback(query, category_id, limit)
def _is_nip(self, query: str) -> bool:
"""Check if query looks like NIP"""
clean = re.sub(r'[\s\-]', '', query)
return bool(re.match(r'^\d{10}$', clean))
def _is_regon(self, query: str) -> bool:
"""Check if query looks like REGON"""
clean = re.sub(r'[\s\-]', '', query)
return bool(re.match(r'^\d{9}$|^\d{14}$', clean))
def _search_by_nip(self, query: str) -> Optional[SearchResult]:
"""Direct NIP lookup"""
clean_nip = re.sub(r'[\s\-]', '', query)
company = self.db.query(Company).filter(
Company.nip == clean_nip,
Company.status == 'active'
).first()
if company:
return SearchResult(company=company, score=100.0, match_type='nip')
return None
def _search_by_regon(self, query: str) -> Optional[SearchResult]:
"""Direct REGON lookup"""
clean_regon = re.sub(r'[\s\-]', '', query)
company = self.db.query(Company).filter(
Company.regon == clean_regon,
Company.status == 'active'
).first()
if company:
return SearchResult(company=company, score=100.0, match_type='regon')
return None
def _expand_keywords(self, query: str) -> List[str]:
"""Expand query with synonyms"""
query_lower = query.lower()
words = [w.strip('.,?!') for w in query_lower.split() if len(w) > 1]
expanded = set(words)
for word in words:
# Direct synonym lookup
if word in KEYWORD_SYNONYMS:
expanded.update(KEYWORD_SYNONYMS[word])
# Check if word is in any synonym list
for key, synonyms in KEYWORD_SYNONYMS.items():
if word in key or any(word in syn for syn in synonyms):
expanded.add(key)
expanded.update(synonyms)
return list(expanded)
def _search_sqlite_fallback(
self,
query: str,
category_id: Optional[int],
limit: int
) -> List[SearchResult]:
"""
SQLite search using keyword scoring
Scoring:
- Company name match: +10
- Description match: +5
- Service match: +8
- Competency match: +7
- City match: +3
"""
keywords = self._expand_keywords(query)
# Get all active companies
companies_query = self.db.query(Company).filter(Company.status == 'active')
if category_id:
companies_query = companies_query.filter(Company.category_id == category_id)
companies = companies_query.all()
# Score each company
scored: List[Tuple[float, Company, str]] = []
for company in companies:
score = 0.0
match_type = 'keyword'
# Name match (highest weight)
name_lower = company.name.lower()
if any(kw in name_lower for kw in keywords):
score += 10
# Exact match bonus
if query.lower() in name_lower:
score += 5
match_type = 'exact'
# Description match
if company.description_short:
desc_lower = company.description_short.lower()
if any(kw in desc_lower for kw in keywords):
score += 5
# Services match
if company.services:
for cs in company.services:
if cs.service and any(kw in cs.service.name.lower() for kw in keywords):
score += 8
break # Only count once per company
# Competencies match
if company.competencies:
for cc in company.competencies:
if cc.competency and any(kw in cc.competency.name.lower() for kw in keywords):
score += 7
break # Only count once per company
# City match
if company.address_city:
if any(kw in company.address_city.lower() for kw in keywords):
score += 3
# Founding history match (owners, history) - high weight for people search
if company.founding_history:
history_lower = company.founding_history.lower()
if any(kw in history_lower for kw in keywords):
score += 12 # High weight - owners/founders are important
# Full description match
if company.description_full:
full_lower = company.description_full.lower()
if any(kw in full_lower for kw in keywords):
score += 4
if score > 0:
scored.append((score, company, match_type))
# Sort by score descending
scored.sort(key=lambda x: x[0], reverse=True)
# Convert to SearchResult
return [
SearchResult(company=c, score=s, match_type=m)
for s, c, m in scored[:limit]
]
def _search_postgres_fts(
self,
query: str,
category_id: Optional[int],
limit: int
) -> List[SearchResult]:
"""
PostgreSQL full-text search with fuzzy matching
Uses:
- tsvector for full-text search
- pg_trgm for fuzzy matching (typos)
"""
keywords = self._expand_keywords(query)
# Build tsquery from keywords
tsquery_parts = [f"{kw}:*" for kw in keywords if kw]
tsquery = ' | '.join(tsquery_parts)
# Check if pg_trgm is available
try:
self.db.execute(text("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'"))
has_trgm = True
except Exception:
self.db.rollback()
has_trgm = False
# Build ILIKE patterns for each keyword (for multi-word searches)
like_patterns = [f'%{kw}%' for kw in keywords if len(kw) > 2]
# Build SQL query with scoring for founding_history matches (owners/founders)
if has_trgm:
sql = text("""
SELECT c.id,
COALESCE(ts_rank(c.search_vector, to_tsquery('simple', :tsquery)), 0) as fts_score,
COALESCE(similarity(c.name, :query), 0) as fuzzy_score,
CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END as history_score
FROM companies c
WHERE c.status = 'active'
AND (
c.search_vector @@ to_tsquery('simple', :tsquery)
OR similarity(c.name, :query) > 0.2
OR c.name ILIKE ANY(:like_patterns)
OR c.description_short ILIKE ANY(:like_patterns)
OR c.founding_history ILIKE ANY(:like_patterns)
OR c.description_full ILIKE ANY(:like_patterns)
)
{category_filter}
ORDER BY GREATEST(
COALESCE(ts_rank(c.search_vector, to_tsquery('simple', :tsquery)), 0),
COALESCE(similarity(c.name, :query), 0),
CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END
) DESC
LIMIT :limit
""".format(
category_filter="AND c.category_id = :category_id" if category_id else ""
))
else:
# Fallback without pg_trgm
sql = text("""
SELECT c.id,
COALESCE(ts_rank(c.search_vector, to_tsquery('simple', :tsquery)), 0) as fts_score,
0 as fuzzy_score,
CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END as history_score
FROM companies c
WHERE c.status = 'active'
AND (
c.search_vector @@ to_tsquery('simple', :tsquery)
OR c.name ILIKE ANY(:like_patterns)
OR c.description_short ILIKE ANY(:like_patterns)
OR c.founding_history ILIKE ANY(:like_patterns)
OR c.description_full ILIKE ANY(:like_patterns)
)
{category_filter}
ORDER BY GREATEST(fts_score, CASE WHEN c.founding_history ILIKE ANY(:like_patterns) THEN 0.5 ELSE 0 END) DESC
LIMIT :limit
""".format(
category_filter="AND c.category_id = :category_id" if category_id else ""
))
params = {
'tsquery': tsquery,
'query': query,
'like_patterns': like_patterns if like_patterns else ['%%'],
'limit': limit
}
if category_id:
params['category_id'] = category_id
try:
result = self.db.execute(sql, params)
rows = result.fetchall()
# Fetch full company objects
company_ids = [row[0] for row in rows]
if not company_ids:
# Fallback to SQLite method if FTS returns nothing
return self._search_sqlite_fallback(query, category_id, limit)
companies_map = {
c.id: c for c in self.db.query(Company).filter(Company.id.in_(company_ids)).all()
}
results = []
for row in rows:
company_id, fts_score, fuzzy_score, history_score = row
if company_id in companies_map:
# Determine match type and score
scores = {
'fts': fts_score or 0,
'fuzzy': fuzzy_score or 0,
'history': history_score or 0
}
best_match = max(scores, key=scores.get)
score = scores[best_match] * 100
results.append(SearchResult(
company=companies_map[company_id],
score=score,
match_type=best_match
))
return results
except Exception as e:
# If FTS fails, rollback and fall back to SQLite method
print(f"PostgreSQL FTS error: {e}, falling back to keyword search")
self.db.rollback()
return self._search_sqlite_fallback(query, category_id, limit)
def _get_all_companies(
self,
category_id: Optional[int],
limit: int
) -> List[SearchResult]:
"""Return all companies when no query given"""
query = self.db.query(Company).filter(Company.status == 'active')
if category_id:
query = query.filter(Company.category_id == category_id)
companies = query.order_by(Company.name).limit(limit).all()
return [
SearchResult(company=c, score=0.0, match_type='all')
for c in companies
]
# Convenience function for use in routes
def search_companies(
db: Session,
query: str,
category_id: Optional[int] = None,
limit: int = 50
) -> List[SearchResult]:
"""
Search companies
Args:
db: Database session
query: Search query
category_id: Optional category filter
limit: Max results
Returns:
List of SearchResult objects
"""
service = SearchService(db)
return service.search(query, category_id, limit)