Law/tests/test_admin_auth.py
2026-03-01 17:31:09 +03:00

247 lines
9.1 KiB
Python

import os
import unittest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine, delete
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
os.environ.setdefault("DATABASE_URL", "sqlite+pysqlite:///:memory:")
os.environ.setdefault("REDIS_URL", "redis://localhost:6379/0")
os.environ.setdefault("S3_ENDPOINT", "http://localhost:9000")
os.environ.setdefault("S3_ACCESS_KEY", "test")
os.environ.setdefault("S3_SECRET_KEY", "test")
os.environ.setdefault("S3_BUCKET", "test")
from app.core.config import settings
from app.core.security import decode_jwt, hash_password
from app.db.session import get_db
from app.main import app
from app.models.admin_user import AdminUser
from app.services.totp_service import (
current_totp_code,
encrypt_totp_secret,
generate_backup_codes,
generate_totp_secret,
)
class AdminAuthTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
cls.SessionLocal = sessionmaker(bind=cls.engine, autocommit=False, autoflush=False)
AdminUser.__table__.create(bind=cls.engine)
@classmethod
def tearDownClass(cls):
AdminUser.__table__.drop(bind=cls.engine)
cls.engine.dispose()
def setUp(self):
with self.SessionLocal() as db:
db.execute(delete(AdminUser))
db.commit()
def override_get_db():
db = self.SessionLocal()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
self.client = TestClient(app)
self._settings_backup = {
"ADMIN_BOOTSTRAP_ENABLED": settings.ADMIN_BOOTSTRAP_ENABLED,
"ADMIN_BOOTSTRAP_EMAIL": settings.ADMIN_BOOTSTRAP_EMAIL,
"ADMIN_BOOTSTRAP_PASSWORD": settings.ADMIN_BOOTSTRAP_PASSWORD,
"ADMIN_BOOTSTRAP_NAME": settings.ADMIN_BOOTSTRAP_NAME,
"ADMIN_AUTH_MODE": settings.ADMIN_AUTH_MODE,
}
settings.ADMIN_BOOTSTRAP_ENABLED = True
settings.ADMIN_BOOTSTRAP_EMAIL = "admin@example.com"
settings.ADMIN_BOOTSTRAP_PASSWORD = "admin123"
settings.ADMIN_BOOTSTRAP_NAME = "Администратор системы"
settings.ADMIN_AUTH_MODE = "password_totp_optional"
def tearDown(self):
self.client.close()
app.dependency_overrides.clear()
for key, value in self._settings_backup.items():
setattr(settings, key, value)
def test_login_bootstraps_admin_when_absent(self):
response = self.client.post(
"/api/admin/auth/login",
json={"email": "admin@example.com", "password": "admin123"},
)
self.assertEqual(response.status_code, 200)
token = response.json().get("access_token")
self.assertTrue(token)
claims = decode_jwt(token, settings.ADMIN_JWT_SECRET)
self.assertEqual(claims.get("email"), "admin@example.com")
self.assertEqual(claims.get("role"), "ADMIN")
with self.SessionLocal() as db:
row = db.query(AdminUser).filter(AdminUser.email == "admin@example.com").first()
self.assertIsNotNone(row)
self.assertEqual(row.role, "ADMIN")
self.assertTrue(bool(row.is_active))
def test_login_rejects_wrong_bootstrap_password(self):
response = self.client.post(
"/api/admin/auth/login",
json={"email": "admin@example.com", "password": "wrong-password"},
)
self.assertEqual(response.status_code, 401)
with self.SessionLocal() as db:
count = db.query(AdminUser).count()
self.assertEqual(count, 0)
def test_existing_admin_is_normalized_to_bootstrap_credentials(self):
with self.SessionLocal() as db:
db.add(
AdminUser(
role="ADMIN",
name="Администратор",
email="admin@example.com",
password_hash=hash_password("custom-pass-1"),
is_active=True,
)
)
db.commit()
ok = self.client.post(
"/api/admin/auth/login",
json={"email": "admin@example.com", "password": "admin123"},
)
self.assertEqual(ok.status_code, 200)
self.assertTrue(ok.json().get("access_token"))
wrong = self.client.post(
"/api/admin/auth/login",
json={"email": "admin@example.com", "password": "custom-pass-1"},
)
self.assertEqual(wrong.status_code, 401)
def test_totp_required_mode_rejects_login_without_totp_code(self):
settings.ADMIN_AUTH_MODE = "password_totp_required"
secret = generate_totp_secret()
with self.SessionLocal() as db:
db.add(
AdminUser(
role="ADMIN",
name="TOTP Admin",
email="totp@example.com",
password_hash=hash_password("pass123"),
is_active=True,
totp_enabled=True,
totp_secret_encrypted=encrypt_totp_secret(secret),
totp_backup_codes_hashes=[],
)
)
db.commit()
response = self.client.post(
"/api/admin/auth/login",
json={"email": "totp@example.com", "password": "pass123"},
)
self.assertEqual(response.status_code, 401)
self.assertIn("TOTP", str(response.json().get("detail", "")))
def test_totp_required_mode_allows_login_with_valid_totp(self):
settings.ADMIN_AUTH_MODE = "password_totp_required"
secret = generate_totp_secret()
code = current_totp_code(secret)
with self.SessionLocal() as db:
db.add(
AdminUser(
role="ADMIN",
name="TOTP Admin",
email="totp2@example.com",
password_hash=hash_password("pass123"),
is_active=True,
totp_enabled=True,
totp_secret_encrypted=encrypt_totp_secret(secret),
totp_backup_codes_hashes=[],
)
)
db.commit()
response = self.client.post(
"/api/admin/auth/login",
json={"email": "totp2@example.com", "password": "pass123", "totp_code": code},
)
self.assertEqual(response.status_code, 200)
self.assertTrue(bool(response.json().get("access_token")))
def test_totp_backup_code_is_single_use(self):
settings.ADMIN_AUTH_MODE = "password_totp_required"
secret = generate_totp_secret()
backup_plain, backup_hashes = generate_backup_codes()
backup_code = backup_plain[0]
with self.SessionLocal() as db:
db.add(
AdminUser(
role="ADMIN",
name="Backup Admin",
email="totp3@example.com",
password_hash=hash_password("pass123"),
is_active=True,
totp_enabled=True,
totp_secret_encrypted=encrypt_totp_secret(secret),
totp_backup_codes_hashes=backup_hashes,
)
)
db.commit()
first = self.client.post(
"/api/admin/auth/login",
json={"email": "totp3@example.com", "password": "pass123", "backup_code": backup_code},
)
self.assertEqual(first.status_code, 200)
second = self.client.post(
"/api/admin/auth/login",
json={"email": "totp3@example.com", "password": "pass123", "backup_code": backup_code},
)
self.assertEqual(second.status_code, 401)
def test_totp_setup_enable_and_status_flow(self):
login = self.client.post(
"/api/admin/auth/login",
json={"email": "admin@example.com", "password": "admin123"},
)
self.assertEqual(login.status_code, 200)
token = login.json().get("access_token")
self.assertTrue(token)
headers = {"Authorization": "Bearer " + token}
setup = self.client.post("/api/admin/auth/totp/setup", json={}, headers=headers)
self.assertEqual(setup.status_code, 200)
setup_body = setup.json()
secret = str(setup_body.get("secret") or "")
self.assertTrue(secret)
self.assertIn("otpauth://totp/", str(setup_body.get("otpauth_uri") or ""))
code = current_totp_code(secret)
enable = self.client.post(
"/api/admin/auth/totp/enable",
json={"secret": secret, "code": code},
headers=headers,
)
self.assertEqual(enable.status_code, 200)
backup_codes = enable.json().get("backup_codes") or []
self.assertTrue(isinstance(backup_codes, list) and len(backup_codes) > 0)
status = self.client.get("/api/admin/auth/totp/status", headers=headers)
self.assertEqual(status.status_code, 200)
self.assertTrue(bool(status.json().get("enabled")))