Replace ~170 manual `if not current_user.is_admin` checks with: - @role_required(SystemRole.ADMIN) for user management, security, ZOPK - @role_required(SystemRole.OFFICE_MANAGER) for content management - current_user.can_access_admin_panel() for admin UI access - current_user.can_moderate_forum() for forum moderation - current_user.can_edit_company(id) for company permissions Add @office_manager_required decorator shortcut. Add SQL migration to sync existing users' role field. Role hierarchy: UNAFFILIATED(10) < MEMBER(20) < EMPLOYEE(30) < MANAGER(40) < OFFICE_MANAGER(50) < ADMIN(100) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
478 lines
16 KiB
Python
478 lines
16 KiB
Python
"""
|
|
Admin Routes - Companies
|
|
========================
|
|
|
|
CRUD operations for company management in admin panel.
|
|
"""
|
|
|
|
import re
|
|
import csv
|
|
import logging
|
|
from io import StringIO
|
|
from datetime import datetime
|
|
|
|
from flask import render_template, request, redirect, url_for, flash, jsonify, Response
|
|
from flask_login import login_required, current_user
|
|
|
|
from . import bp
|
|
from database import SessionLocal, Company, Category, User, Person, CompanyPerson, SystemRole
|
|
from utils.decorators import role_required
|
|
|
|
# Logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def validate_nip(nip: str) -> bool:
|
|
"""Validate Polish NIP number (10 digits, checksum)"""
|
|
if not nip or not re.match(r'^\d{10}$', nip):
|
|
return False
|
|
|
|
weights = [6, 5, 7, 2, 3, 4, 5, 6, 7]
|
|
checksum = sum(int(nip[i]) * weights[i] for i in range(9)) % 11
|
|
return checksum == int(nip[9])
|
|
|
|
|
|
# ============================================================
|
|
# COMPANIES ADMIN ROUTES
|
|
# ============================================================
|
|
|
|
@bp.route('/companies')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_companies():
|
|
"""Admin panel for company management"""
|
|
db = SessionLocal()
|
|
try:
|
|
# Get filter parameters
|
|
status_filter = request.args.get('status', 'all')
|
|
category_filter = request.args.get('category', '')
|
|
quality_filter = request.args.get('quality', '')
|
|
search_query = request.args.get('q', '').strip()
|
|
|
|
# Base query
|
|
query = db.query(Company)
|
|
|
|
# Apply filters
|
|
if status_filter and status_filter != 'all':
|
|
query = query.filter(Company.status == status_filter)
|
|
|
|
if category_filter:
|
|
query = query.filter(Company.category_id == int(category_filter))
|
|
|
|
if quality_filter:
|
|
query = query.filter(Company.data_quality == quality_filter)
|
|
|
|
if search_query:
|
|
search_pattern = f'%{search_query}%'
|
|
query = query.filter(
|
|
(Company.name.ilike(search_pattern)) |
|
|
(Company.nip.ilike(search_pattern))
|
|
)
|
|
|
|
# Order and fetch
|
|
companies = query.order_by(Company.name).all()
|
|
|
|
# Get categories for filter dropdown
|
|
categories = db.query(Category).order_by(Category.name).all()
|
|
|
|
# Statistics
|
|
total_companies = db.query(Company).count()
|
|
active_count = db.query(Company).filter(Company.status == 'active').count()
|
|
pending_count = db.query(Company).filter(Company.status == 'pending').count()
|
|
inactive_count = db.query(Company).filter(Company.status == 'inactive').count()
|
|
|
|
logger.info(f"Admin {current_user.email} accessed companies panel - {total_companies} companies")
|
|
|
|
return render_template(
|
|
'admin/companies.html',
|
|
companies=companies,
|
|
categories=categories,
|
|
total_companies=total_companies,
|
|
active_count=active_count,
|
|
pending_count=pending_count,
|
|
inactive_count=inactive_count,
|
|
current_status=status_filter,
|
|
current_category=category_filter,
|
|
current_quality=quality_filter,
|
|
search_query=search_query
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/add', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_company_add():
|
|
"""Create a new company"""
|
|
db = SessionLocal()
|
|
try:
|
|
data = request.get_json() or {}
|
|
|
|
name = data.get('name', '').strip()
|
|
if not name:
|
|
return jsonify({'success': False, 'error': 'Nazwa firmy jest wymagana'}), 400
|
|
|
|
nip = data.get('nip', '').strip().replace('-', '').replace(' ', '')
|
|
if nip:
|
|
if not validate_nip(nip):
|
|
return jsonify({'success': False, 'error': 'Nieprawidłowy NIP'}), 400
|
|
|
|
existing = db.query(Company).filter(Company.nip == nip).first()
|
|
if existing:
|
|
return jsonify({'success': False, 'error': f'Firma z NIP {nip} już istnieje'}), 400
|
|
|
|
# Generate slug from name
|
|
slug = re.sub(r'[^\w\s-]', '', name.lower())
|
|
slug = re.sub(r'[\s_]+', '-', slug)
|
|
slug = re.sub(r'-+', '-', slug).strip('-')
|
|
|
|
# Ensure unique slug
|
|
base_slug = slug
|
|
counter = 1
|
|
while db.query(Company).filter(Company.slug == slug).first():
|
|
slug = f"{base_slug}-{counter}"
|
|
counter += 1
|
|
|
|
new_company = Company(
|
|
name=name,
|
|
slug=slug,
|
|
nip=nip if nip else None,
|
|
category_id=data.get('category_id') or None,
|
|
status=data.get('status', 'pending'),
|
|
email=data.get('email', '').strip() or None,
|
|
phone=data.get('phone', '').strip() or None,
|
|
address_city=data.get('address_city', '').strip() or None,
|
|
address_street=data.get('address_street', '').strip() or None,
|
|
address_postal=data.get('address_postal', '').strip() or None,
|
|
data_quality='basic'
|
|
)
|
|
|
|
db.add(new_company)
|
|
db.commit()
|
|
db.refresh(new_company)
|
|
|
|
logger.info(f"Admin {current_user.email} created new company: {name} (ID: {new_company.id})")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'company_id': new_company.id,
|
|
'message': f'Firma "{name}" została utworzona'
|
|
})
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error creating company: {e}")
|
|
return jsonify({'success': False, 'error': 'Błąd podczas tworzenia firmy'}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/<int:company_id>')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_company_get(company_id):
|
|
"""Get company details (JSON)"""
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter(Company.id == company_id).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'company': {
|
|
'id': company.id,
|
|
'name': company.name,
|
|
'nip': company.nip,
|
|
'category_id': company.category_id,
|
|
'status': company.status,
|
|
'email': company.email,
|
|
'phone': company.phone,
|
|
'address_city': company.address_city,
|
|
'address_street': company.address_street,
|
|
'address_postal': company.address_postal,
|
|
'data_quality': company.data_quality
|
|
}
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/<int:company_id>/update', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_company_update(company_id):
|
|
"""Update company data"""
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter(Company.id == company_id).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
|
|
|
|
data = request.get_json() or {}
|
|
|
|
if 'name' in data:
|
|
name = data['name'].strip()
|
|
if not name:
|
|
return jsonify({'success': False, 'error': 'Nazwa firmy jest wymagana'}), 400
|
|
company.name = name
|
|
|
|
if 'nip' in data:
|
|
nip = data['nip'].strip().replace('-', '').replace(' ', '') if data['nip'] else ''
|
|
if nip:
|
|
if not validate_nip(nip):
|
|
return jsonify({'success': False, 'error': 'Nieprawidłowy NIP'}), 400
|
|
|
|
existing = db.query(Company).filter(Company.nip == nip, Company.id != company_id).first()
|
|
if existing:
|
|
return jsonify({'success': False, 'error': f'Firma z NIP {nip} już istnieje'}), 400
|
|
company.nip = nip if nip else None
|
|
|
|
if 'category_id' in data:
|
|
company.category_id = data['category_id'] if data['category_id'] else None
|
|
|
|
if 'status' in data:
|
|
if data['status'] in ['active', 'pending', 'inactive', 'archived']:
|
|
company.status = data['status']
|
|
|
|
if 'email' in data:
|
|
company.email = data['email'].strip() if data['email'] else None
|
|
|
|
if 'phone' in data:
|
|
company.phone = data['phone'].strip() if data['phone'] else None
|
|
|
|
if 'address_city' in data:
|
|
company.address_city = data['address_city'].strip() if data['address_city'] else None
|
|
|
|
if 'address_street' in data:
|
|
company.address_street = data['address_street'].strip() if data['address_street'] else None
|
|
|
|
if 'address_postal' in data:
|
|
company.address_postal = data['address_postal'].strip() if data['address_postal'] else None
|
|
|
|
company.last_updated = datetime.utcnow()
|
|
db.commit()
|
|
|
|
logger.info(f"Admin {current_user.email} updated company {company.name} (ID: {company_id})")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Dane firmy zaktualizowane'
|
|
})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Error updating company {company_id}: {e}")
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/<int:company_id>/toggle-status', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_company_toggle_status(company_id):
|
|
"""Toggle company status (active <-> inactive)"""
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter(Company.id == company_id).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
|
|
|
|
if company.status == 'active':
|
|
company.status = 'inactive'
|
|
else:
|
|
company.status = 'active'
|
|
|
|
company.last_updated = datetime.utcnow()
|
|
db.commit()
|
|
|
|
logger.info(f"Admin {current_user.email} toggled company {company.name} status to {company.status}")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'status': company.status,
|
|
'message': f"Status zmieniony na {'aktywna' if company.status == 'active' else 'nieaktywna'}"
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/<int:company_id>/delete', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_company_delete(company_id):
|
|
"""Soft delete company (set status to archived)"""
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter(Company.id == company_id).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
|
|
|
|
company.status = 'archived'
|
|
company.last_updated = datetime.utcnow()
|
|
db.commit()
|
|
|
|
logger.info(f"Admin {current_user.email} archived company {company.name} (ID: {company_id})")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Firma "{company.name}" została zarchiwizowana'
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/<int:company_id>/assign-user', methods=['POST'])
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_company_assign_user(company_id):
|
|
"""Assign a user to a company"""
|
|
db = SessionLocal()
|
|
try:
|
|
data = request.get_json() or {}
|
|
user_id = data.get('user_id')
|
|
|
|
company = db.query(Company).filter(Company.id == company_id).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
|
|
|
|
if user_id:
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
return jsonify({'success': False, 'error': 'Użytkownik nie istnieje'}), 404
|
|
|
|
user.company_id = company_id
|
|
db.commit()
|
|
|
|
logger.info(f"Admin {current_user.email} assigned user {user.email} to company {company.name}")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Użytkownik {user.email} przypisany do {company.name}'
|
|
})
|
|
else:
|
|
return jsonify({'success': False, 'error': 'Nie podano ID użytkownika'}), 400
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/<int:company_id>/people')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_company_people(company_id):
|
|
"""Get people associated with a company"""
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter(Company.id == company_id).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
|
|
|
|
# Get CompanyPerson relationships
|
|
people_roles = db.query(CompanyPerson).filter(
|
|
CompanyPerson.company_id == company_id
|
|
).all()
|
|
|
|
people_list = []
|
|
for cp in people_roles:
|
|
person = cp.person
|
|
people_list.append({
|
|
'id': cp.id,
|
|
'person_id': person.id,
|
|
'imiona': person.imiona,
|
|
'nazwisko': person.nazwisko,
|
|
'pesel_masked': f"{person.pesel[:4]}******" if person.pesel else None,
|
|
'role': cp.role,
|
|
'role_category': cp.role_category,
|
|
'shares_percent': float(cp.shares_percent) if cp.shares_percent else None
|
|
})
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'company_name': company.name,
|
|
'people': people_list
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/<int:company_id>/users')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_company_users(company_id):
|
|
"""Get users assigned to a company"""
|
|
db = SessionLocal()
|
|
try:
|
|
company = db.query(Company).filter(Company.id == company_id).first()
|
|
if not company:
|
|
return jsonify({'success': False, 'error': 'Firma nie istnieje'}), 404
|
|
|
|
users = db.query(User).filter(User.company_id == company_id).all()
|
|
|
|
users_list = [{
|
|
'id': u.id,
|
|
'name': u.name,
|
|
'email': u.email,
|
|
'is_admin': u.is_admin,
|
|
'is_verified': u.is_verified
|
|
} for u in users]
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'company_name': company.name,
|
|
'users': users_list
|
|
})
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@bp.route('/companies/export')
|
|
@login_required
|
|
@role_required(SystemRole.OFFICE_MANAGER)
|
|
def admin_companies_export():
|
|
"""Export companies to CSV"""
|
|
db = SessionLocal()
|
|
try:
|
|
companies = db.query(Company).order_by(Company.name).all()
|
|
|
|
output = StringIO()
|
|
writer = csv.writer(output)
|
|
|
|
# Header
|
|
writer.writerow([
|
|
'ID', 'Nazwa', 'NIP', 'Kategoria', 'Status',
|
|
'Email', 'Telefon', 'Miasto', 'Ulica', 'Kod pocztowy',
|
|
'Jakość danych', 'Data utworzenia'
|
|
])
|
|
|
|
# Data rows
|
|
for c in companies:
|
|
writer.writerow([
|
|
c.id,
|
|
c.name,
|
|
c.nip or '',
|
|
c.category.name if c.category else '',
|
|
c.status or '',
|
|
c.email or '',
|
|
c.phone or '',
|
|
c.address_city or '',
|
|
c.address_street or '',
|
|
c.address_postal or '',
|
|
c.data_quality or '',
|
|
c.created_at.strftime('%Y-%m-%d') if c.created_at else ''
|
|
])
|
|
|
|
output.seek(0)
|
|
|
|
logger.info(f"Admin {current_user.email} exported {len(companies)} companies to CSV")
|
|
|
|
return Response(
|
|
output.getvalue(),
|
|
mimetype='text/csv',
|
|
headers={
|
|
'Content-Disposition': f'attachment; filename=companies_{datetime.now().strftime("%Y%m%d")}.csv'
|
|
}
|
|
)
|
|
finally:
|
|
db.close()
|