Law/tests/test_security_audit.py
2026-03-02 16:22:07 +03:00

263 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import unittest
from datetime import timedelta
from uuid import UUID
from unittest.mock import patch
from botocore.exceptions import ClientError
from fastapi.testclient import TestClient
from sqlalchemy import create_engine, delete
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
os.environ.setdefault("DATABASE_URL", "sqlite+pysqlite:///:memory:")
os.environ.setdefault("REDIS_URL", "redis://localhost:6379/0")
os.environ.setdefault("S3_ENDPOINT", "http://localhost:9000")
os.environ.setdefault("S3_ACCESS_KEY", "test")
os.environ.setdefault("S3_SECRET_KEY", "test")
os.environ.setdefault("S3_BUCKET", "test")
from app.core.config import settings
from app.core.security import create_jwt
from app.db.session import get_db
from app.main import app
from app.models.admin_user import AdminUser
from app.models.attachment import Attachment
from app.models.message import Message
from app.models.notification import Notification
from app.models.request import Request
from app.models.security_audit_log import SecurityAuditLog
class _FakeBody:
def __init__(self, payload: bytes):
self.payload = payload
def iter_chunks(self, chunk_size=65536):
for i in range(0, len(self.payload), chunk_size):
yield self.payload[i : i + chunk_size]
class _FakeS3Storage:
def __init__(self):
self.objects = {}
def get_object(self, key: str) -> dict:
obj = self.objects.get(key)
if obj is None:
raise ClientError({"Error": {"Code": "404", "Message": "Not Found"}}, "GetObject")
return {"Body": _FakeBody(obj["content"]), "ContentType": obj["mime"], "ContentLength": obj["size"]}
class SecurityAuditTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
cls.SessionLocal = sessionmaker(bind=cls.engine, autocommit=False, autoflush=False)
AdminUser.__table__.create(bind=cls.engine)
Request.__table__.create(bind=cls.engine)
Notification.__table__.create(bind=cls.engine)
Message.__table__.create(bind=cls.engine)
Attachment.__table__.create(bind=cls.engine)
SecurityAuditLog.__table__.create(bind=cls.engine)
@classmethod
def tearDownClass(cls):
SecurityAuditLog.__table__.drop(bind=cls.engine)
Attachment.__table__.drop(bind=cls.engine)
Message.__table__.drop(bind=cls.engine)
Notification.__table__.drop(bind=cls.engine)
Request.__table__.drop(bind=cls.engine)
AdminUser.__table__.drop(bind=cls.engine)
cls.engine.dispose()
def setUp(self):
with self.SessionLocal() as db:
db.execute(delete(SecurityAuditLog))
db.execute(delete(Notification))
db.execute(delete(Attachment))
db.execute(delete(Message))
db.execute(delete(Request))
db.execute(delete(AdminUser))
db.commit()
def override_get_db():
db = self.SessionLocal()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
self.client = TestClient(app)
def tearDown(self):
self.client.close()
app.dependency_overrides.clear()
@staticmethod
def _admin_headers(sub: str, role: str = "ADMIN", email: str = "admin@example.com") -> dict[str, str]:
token = create_jwt(
{"sub": sub, "email": email, "role": role},
settings.ADMIN_JWT_SECRET,
timedelta(minutes=30),
)
return {"Authorization": f"Bearer {token}"}
def test_public_attachment_download_writes_security_allow_event(self):
fake_s3 = _FakeS3Storage()
with self.SessionLocal() as db:
req = Request(
track_number="TRK-SEC-PUB-1",
client_name="Клиент",
client_phone="+79990001010",
topic_code="civil-law",
status_code="NEW",
extra_fields={},
total_attachments_bytes=0,
)
db.add(req)
db.flush()
key = f"requests/{req.id}/doc.pdf"
att = Attachment(
request_id=req.id,
message_id=None,
file_name="doc.pdf",
mime_type="application/pdf",
size_bytes=1024,
s3_key=key,
responsible="Клиент",
)
db.add(att)
db.commit()
attachment_id = str(att.id)
track_number = req.track_number
fake_s3.objects[key] = {"size": 1024, "mime": "application/pdf", "content": b"x" * 1024}
public_token = create_jwt({"sub": track_number, "purpose": "VIEW_REQUEST"}, settings.PUBLIC_JWT_SECRET, timedelta(days=1))
cookies = {settings.PUBLIC_COOKIE_NAME: public_token}
with patch("app.api.public.uploads.get_s3_storage", return_value=fake_s3):
response = self.client.get(f"/api/public/uploads/object/{attachment_id}", cookies=cookies)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, b"x" * 1024)
with self.SessionLocal() as db:
rows = (
db.query(SecurityAuditLog)
.filter(SecurityAuditLog.action == "DOWNLOAD_OBJECT", SecurityAuditLog.actor_role == "CLIENT")
.all()
)
self.assertEqual(len(rows), 1)
row = rows[0]
self.assertTrue(row.allowed)
self.assertEqual(row.object_key, key)
self.assertEqual(str(row.attachment_id), attachment_id)
self.assertEqual(row.scope, "REQUEST_ATTACHMENT")
def test_public_request_card_read_writes_pii_access_event(self):
with self.SessionLocal() as db:
req = Request(
track_number="TRK-SEC-READ-1",
client_name="Клиент",
client_phone="+79990001011",
topic_code="civil-law",
status_code="NEW",
extra_fields={},
)
db.add(req)
db.commit()
public_token = create_jwt(
{"sub": "TRK-SEC-READ-1", "purpose": "VIEW_REQUEST"},
settings.PUBLIC_JWT_SECRET,
timedelta(days=1),
)
cookies = {settings.PUBLIC_COOKIE_NAME: public_token}
response = self.client.get("/api/public/requests/TRK-SEC-READ-1", cookies=cookies)
self.assertEqual(response.status_code, 200)
with self.SessionLocal() as db:
rows = (
db.query(SecurityAuditLog)
.filter(
SecurityAuditLog.action == "READ_REQUEST_CARD",
SecurityAuditLog.scope == "REQUEST_CARD",
SecurityAuditLog.actor_role == "CLIENT",
)
.all()
)
self.assertGreaterEqual(len(rows), 1)
row = rows[-1]
self.assertTrue(row.allowed)
self.assertEqual(row.actor_subject, "TRK-SEC-READ-1")
def test_admin_object_proxy_denied_writes_security_deny_event(self):
fake_s3 = _FakeS3Storage()
with self.SessionLocal() as db:
lawyer_a = AdminUser(
role="LAWYER",
name="Юрист А",
email="sec-lawyer-a@example.com",
password_hash="hash",
is_active=True,
)
lawyer_b = AdminUser(
role="LAWYER",
name="Юрист Б",
email="sec-lawyer-b@example.com",
password_hash="hash",
is_active=True,
)
db.add_all([lawyer_a, lawyer_b])
db.flush()
req = Request(
track_number="TRK-SEC-ADM-1",
client_name="Клиент",
client_phone="+79990002020",
topic_code="civil-law",
status_code="IN_PROGRESS",
assigned_lawyer_id=str(lawyer_b.id),
extra_fields={},
total_attachments_bytes=0,
)
db.add(req)
db.flush()
key = f"requests/{req.id}/proof.pdf"
db.add(
Attachment(
request_id=req.id,
file_name="proof.pdf",
mime_type="application/pdf",
size_bytes=1024,
s3_key=key,
)
)
db.commit()
lawyer_a_id = str(lawyer_a.id)
token = self._admin_headers(sub=lawyer_a_id, role="LAWYER", email="sec-lawyer-a@example.com")["Authorization"].replace(
"Bearer ", ""
)
fake_s3.objects[key] = {"size": 1024, "mime": "application/pdf", "content": b"x" * 1024}
with patch("app.api.admin.uploads.get_s3_storage", return_value=fake_s3):
response = self.client.get(f"/api/admin/uploads/object/{key}?token={token}")
self.assertEqual(response.status_code, 403)
with self.SessionLocal() as db:
rows = (
db.query(SecurityAuditLog)
.filter(SecurityAuditLog.action == "DOWNLOAD_OBJECT", SecurityAuditLog.actor_role == "LAWYER")
.all()
)
self.assertEqual(len(rows), 1)
row = rows[0]
self.assertFalse(row.allowed)
self.assertEqual(row.object_key, key)
self.assertIn("Недостаточно прав", str(row.reason or ""))
self.assertEqual(str(row.request_id), key.split("/")[1])
UUID(str(row.id))