mirror of
https://github.com/TronoSfera/Law.git
synced 2026-05-18 10:03:45 +03:00
71 lines
2.4 KiB
Python
71 lines
2.4 KiB
Python
from __future__ import annotations
|
|
|
|
from sqlalchemy import func
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import settings
|
|
from app.core.security import hash_password, verify_password
|
|
from app.models.admin_user import AdminUser
|
|
|
|
|
|
def normalize_admin_email(raw: str | None) -> str:
|
|
return str(raw or "").strip().lower()
|
|
|
|
|
|
def get_active_admin_by_email(db: Session, email: str) -> AdminUser | None:
|
|
normalized = normalize_admin_email(email)
|
|
if not normalized:
|
|
return None
|
|
return (
|
|
db.query(AdminUser)
|
|
.filter(func.lower(AdminUser.email) == normalized, AdminUser.is_active.is_(True))
|
|
.first()
|
|
)
|
|
|
|
|
|
def get_admin_by_email_any_state(db: Session, email: str) -> AdminUser | None:
|
|
normalized = normalize_admin_email(email)
|
|
if not normalized:
|
|
return None
|
|
return db.query(AdminUser).filter(func.lower(AdminUser.email) == normalized).first()
|
|
|
|
|
|
def ensure_bootstrap_admin_for_login(db: Session, email: str, password: str) -> AdminUser | None:
|
|
if not settings.ADMIN_BOOTSTRAP_ENABLED:
|
|
return None
|
|
|
|
normalized_email = normalize_admin_email(email)
|
|
bootstrap_email = normalize_admin_email(settings.ADMIN_BOOTSTRAP_EMAIL)
|
|
if normalized_email != bootstrap_email:
|
|
return None
|
|
if str(password or "") != str(settings.ADMIN_BOOTSTRAP_PASSWORD or ""):
|
|
return None
|
|
|
|
user = get_admin_by_email_any_state(db, bootstrap_email)
|
|
if user is None:
|
|
user = AdminUser(
|
|
role="ADMIN",
|
|
name=str(settings.ADMIN_BOOTSTRAP_NAME or "Администратор системы"),
|
|
email=bootstrap_email,
|
|
password_hash=hash_password(str(settings.ADMIN_BOOTSTRAP_PASSWORD or "")),
|
|
is_active=True,
|
|
)
|
|
db.add(user)
|
|
else:
|
|
user.role = "ADMIN"
|
|
user.email = bootstrap_email
|
|
user.is_active = True
|
|
if not str(user.name or "").strip():
|
|
user.name = str(settings.ADMIN_BOOTSTRAP_NAME or "Администратор системы")
|
|
if not verify_password(str(settings.ADMIN_BOOTSTRAP_PASSWORD or ""), str(user.password_hash or "")):
|
|
user.password_hash = hash_password(str(settings.ADMIN_BOOTSTRAP_PASSWORD or ""))
|
|
db.add(user)
|
|
|
|
try:
|
|
db.commit()
|
|
except IntegrityError:
|
|
db.rollback()
|
|
return get_active_admin_by_email(db, bootstrap_email)
|
|
db.refresh(user)
|
|
return user
|