import os import unittest from datetime import datetime, timedelta, timezone from uuid import UUID from unittest.mock import patch from botocore.exceptions import ClientError from fastapi.testclient import TestClient from sqlalchemy import create_engine, delete from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool os.environ.setdefault("DATABASE_URL", "sqlite+pysqlite:///:memory:") os.environ.setdefault("REDIS_URL", "redis://localhost:6379/0") os.environ.setdefault("S3_ENDPOINT", "http://localhost:9000") os.environ.setdefault("S3_ACCESS_KEY", "test") os.environ.setdefault("S3_SECRET_KEY", "test") os.environ.setdefault("S3_BUCKET", "test") from app.core.config import settings from app.core.security import create_jwt from app.db.session import get_db from app.main import app from app.models.admin_user import AdminUser from app.models.attachment import Attachment from app.models.message import Message from app.models.notification import Notification from app.models.request import Request from app.models.status import Status from app.models.status_history import StatusHistory from app.models.topic_status_transition import TopicStatusTransition from app.services.chat_secure_service import create_admin_or_lawyer_message from app.services.notifications import EVENT_REQUEST_DATA, notify_request_event from app.workers.tasks import sla as sla_task class _FakeS3Storage: def __init__(self): self.objects = {} def create_presigned_put_url(self, key: str, mime_type: str, expires_sec: int = 900) -> str: return f"https://s3.local/{key}?expires={expires_sec}" def head_object(self, key: str) -> dict: obj = self.objects.get(key) if obj is None: raise ClientError({"Error": {"Code": "404", "Message": "Not Found"}}, "HeadObject") return {"ContentLength": obj["size"], "ContentType": obj["mime"]} class NotificationFlowTests(unittest.TestCase): @classmethod def setUpClass(cls): cls.engine = create_engine( "sqlite+pysqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) cls.SessionLocal = sessionmaker(bind=cls.engine, autocommit=False, autoflush=False) AdminUser.__table__.create(bind=cls.engine) Request.__table__.create(bind=cls.engine) Message.__table__.create(bind=cls.engine) Attachment.__table__.create(bind=cls.engine) StatusHistory.__table__.create(bind=cls.engine) TopicStatusTransition.__table__.create(bind=cls.engine) Notification.__table__.create(bind=cls.engine) @classmethod def tearDownClass(cls): Notification.__table__.drop(bind=cls.engine) TopicStatusTransition.__table__.drop(bind=cls.engine) StatusHistory.__table__.drop(bind=cls.engine) Attachment.__table__.drop(bind=cls.engine) Message.__table__.drop(bind=cls.engine) Request.__table__.drop(bind=cls.engine) AdminUser.__table__.drop(bind=cls.engine) cls.engine.dispose() def setUp(self): with self.SessionLocal() as db: db.execute(delete(Notification)) db.execute(delete(StatusHistory)) db.execute(delete(TopicStatusTransition)) db.execute(delete(Attachment)) db.execute(delete(Message)) db.execute(delete(Request)) db.execute(delete(AdminUser)) db.commit() def override_get_db(): db = self.SessionLocal() try: yield db finally: db.close() app.dependency_overrides[get_db] = override_get_db self.client = TestClient(app) def tearDown(self): self.client.close() app.dependency_overrides.clear() @staticmethod def _admin_headers(sub: str, role: str, email: str) -> dict[str, str]: token = create_jwt( {"sub": sub, "email": email, "role": role}, settings.ADMIN_JWT_SECRET, timedelta(minutes=30), ) return {"Authorization": f"Bearer {token}"} @staticmethod def _public_cookies(track_number: str) -> dict[str, str]: token = create_jwt({"sub": track_number, "purpose": "VIEW_REQUEST"}, settings.PUBLIC_JWT_SECRET, timedelta(days=1)) return {settings.PUBLIC_COOKIE_NAME: token} def test_public_message_creates_internal_notification_for_lawyer(self): with self.SessionLocal() as db: lawyer = AdminUser( role="LAWYER", name="Юрист", email="lawyer@example.com", password_hash="hash", is_active=True, ) db.add(lawyer) db.flush() req = Request( track_number="TRK-NOTIF-MSG", client_name="Клиент", client_phone="+79990000001", topic_code="civil", status_code="NEW", description="notification", extra_fields={}, assigned_lawyer_id=str(lawyer.id), ) db.add(req) db.commit() lawyer_id = str(lawyer.id) created = self.client.post( "/api/public/requests/TRK-NOTIF-MSG/messages", cookies=self._public_cookies("TRK-NOTIF-MSG"), json={"body": "Есть новое сообщение"}, ) self.assertEqual(created.status_code, 201) with self.SessionLocal() as db: rows = db.query(Notification).all() self.assertEqual(len(rows), 1) self.assertEqual(rows[0].event_type, "MESSAGE") self.assertEqual(str(rows[0].recipient_admin_user_id), lawyer_id) self.assertFalse(rows[0].is_read) notif_id = str(rows[0].id) headers = self._admin_headers(lawyer_id, "LAWYER", "lawyer@example.com") listed = self.client.get("/api/admin/notifications", headers=headers) self.assertEqual(listed.status_code, 200) self.assertEqual(listed.json()["total"], 1) self.assertEqual(listed.json()["unread_total"], 1) marked = self.client.post(f"/api/admin/notifications/{notif_id}/read", headers=headers) self.assertEqual(marked.status_code, 200) self.assertEqual(marked.json()["changed"], 1) unread = self.client.get("/api/admin/notifications?unread_only=true", headers=headers) self.assertEqual(unread.status_code, 200) self.assertEqual(unread.json()["total"], 0) def test_admin_status_change_creates_client_notification_and_open_marks_read(self): with self.SessionLocal() as db: req = Request( track_number="TRK-NOTIF-STATUS", client_name="Клиент", client_phone="+79990000002", topic_code="civil", status_code="NEW", description="notification status", extra_fields={}, ) db.add(req) db.commit() request_id = str(req.id) headers = self._admin_headers(sub=str(UUID(int=1)), role="ADMIN", email="admin@example.com") updated = self.client.patch( f"/api/admin/requests/{request_id}", headers=headers, json={"status_code": "IN_PROGRESS"}, ) self.assertEqual(updated.status_code, 200) listed = self.client.get( "/api/public/requests/TRK-NOTIF-STATUS/notifications", cookies=self._public_cookies("TRK-NOTIF-STATUS"), ) self.assertEqual(listed.status_code, 200) self.assertEqual(listed.json()["total"], 1) self.assertEqual(listed.json()["rows"][0]["event_type"], "STATUS") self.assertEqual(listed.json()["unread_total"], 1) opened = self.client.get( "/api/public/requests/TRK-NOTIF-STATUS", cookies=self._public_cookies("TRK-NOTIF-STATUS"), ) self.assertEqual(opened.status_code, 200) unread = self.client.get( "/api/public/requests/TRK-NOTIF-STATUS/notifications?unread_only=true", cookies=self._public_cookies("TRK-NOTIF-STATUS"), ) self.assertEqual(unread.status_code, 200) self.assertEqual(unread.json()["total"], 0) def test_public_attachment_creates_lawyer_notification(self): fake_s3 = _FakeS3Storage() with self.SessionLocal() as db: lawyer = AdminUser( role="LAWYER", name="Юрист", email="lawyer-file@example.com", password_hash="hash", is_active=True, ) db.add(lawyer) db.flush() req = Request( track_number="TRK-NOTIF-FILE", client_name="Клиент", client_phone="+79990000003", topic_code="civil", status_code="NEW", description="notification file", extra_fields={}, assigned_lawyer_id=str(lawyer.id), ) db.add(req) db.commit() request_id = str(req.id) lawyer_id = str(lawyer.id) with patch("app.api.public.uploads.get_s3_storage", return_value=fake_s3): init_resp = self.client.post( "/api/public/uploads/init", cookies=self._public_cookies("TRK-NOTIF-FILE"), json={ "file_name": "doc.pdf", "mime_type": "application/pdf", "size_bytes": 1024, "scope": "REQUEST_ATTACHMENT", "request_id": request_id, }, ) self.assertEqual(init_resp.status_code, 200) key = init_resp.json()["key"] fake_s3.objects[key] = {"size": 1024, "mime": "application/pdf"} complete = self.client.post( "/api/public/uploads/complete", cookies=self._public_cookies("TRK-NOTIF-FILE"), json={ "key": key, "file_name": "doc.pdf", "mime_type": "application/pdf", "size_bytes": 1024, "scope": "REQUEST_ATTACHMENT", "request_id": request_id, }, ) self.assertEqual(complete.status_code, 200) with self.SessionLocal() as db: rows = db.query(Notification).filter(Notification.event_type == "ATTACHMENT").all() self.assertEqual(len(rows), 1) self.assertEqual(str(rows[0].recipient_admin_user_id), lawyer_id) def test_admin_reassign_creates_reassignment_notifications_and_unread_markers(self): with self.SessionLocal() as db: admin = AdminUser( role="ADMIN", name="Админ", email="root.reassign@example.com", password_hash="hash", is_active=True, ) lawyer_old = AdminUser( role="LAWYER", name="Юрист Старый", email="lawyer.old@example.com", password_hash="hash", is_active=True, ) lawyer_new = AdminUser( role="LAWYER", name="Юрист Новый", email="lawyer.new@example.com", password_hash="hash", is_active=True, ) db.add_all([admin, lawyer_old, lawyer_new]) db.flush() req = Request( track_number="TRK-NOTIF-REASSIGN", client_name="Клиент", client_phone="+79990000031", topic_code="civil", status_code="IN_PROGRESS", description="reassign notification", extra_fields={}, assigned_lawyer_id=str(lawyer_old.id), ) db.add(req) db.commit() request_id = str(req.id) admin_id = str(admin.id) new_lawyer_id = str(lawyer_new.id) headers = self._admin_headers(admin_id, "ADMIN", "root.reassign@example.com") resp = self.client.post( f"/api/admin/requests/{request_id}/reassign", headers=headers, json={"lawyer_id": new_lawyer_id}, ) self.assertEqual(resp.status_code, 200) with self.SessionLocal() as db: rows = ( db.query(Notification) .filter( Notification.request_id == UUID(request_id), Notification.event_type == "REASSIGNMENT", ) .all() ) self.assertGreaterEqual(len(rows), 2) self.assertTrue(any(str(row.recipient_track_number or "").upper() == "TRK-NOTIF-REASSIGN" for row in rows)) self.assertTrue(any(str(row.recipient_admin_user_id or "") == new_lawyer_id for row in rows)) req = db.get(Request, UUID(request_id)) self.assertIsNotNone(req) self.assertTrue(bool(req.client_has_unread_updates)) self.assertEqual(str(req.client_unread_event_type or "").upper(), "REASSIGNMENT") self.assertTrue(bool(req.lawyer_has_unread_updates)) self.assertEqual(str(req.lawyer_unread_event_type or "").upper(), "REASSIGNMENT") def test_request_data_event_from_client_notifies_lawyer_and_admin(self): with self.SessionLocal() as db: admin = AdminUser( role="ADMIN", name="Админ", email="root.data@example.com", password_hash="hash", is_active=True, ) lawyer = AdminUser( role="LAWYER", name="Юрист", email="lawyer.data@example.com", password_hash="hash", is_active=True, ) db.add_all([admin, lawyer]) db.flush() req = Request( track_number="TRK-NOTIF-REQDATA", client_name="Клиент", client_phone="+79990000032", topic_code="civil", status_code="IN_PROGRESS", description="request data notification", extra_fields={}, assigned_lawyer_id=str(lawyer.id), ) db.add(req) db.flush() result = notify_request_event( db, request=req, event_type=EVENT_REQUEST_DATA, actor_role="CLIENT", body="Клиент обновил доп. данные", responsible="Клиент", send_telegram=False, ) db.commit() self.assertEqual(int(result.get("internal_created") or 0), 2) rows = db.query(Notification).filter(Notification.event_type == "REQUEST_DATA").all() self.assertEqual(len(rows), 2) self.assertTrue(any(str(row.recipient_admin_user_id or "") == str(admin.id) for row in rows)) self.assertTrue(any(str(row.recipient_admin_user_id or "") == str(lawyer.id) for row in rows)) def test_client_event_notifies_participant_lawyer_when_request_unassigned(self): with self.SessionLocal() as db: lawyer = AdminUser( role="LAWYER", name="Юрист Участник", email="lawyer.participant@example.com", password_hash="hash", is_active=True, ) db.add(lawyer) db.flush() req = Request( track_number="TRK-NOTIF-PARTICIPANT", client_name="Клиент", client_phone="+79990000033", topic_code="civil", status_code="IN_PROGRESS", description="participant notification", extra_fields={}, assigned_lawyer_id=None, ) db.add(req) db.commit() create_admin_or_lawyer_message( db, request=req, body="Сообщение юриста", actor_role="LAWYER", actor_name="Юрист Участник", actor_admin_user_id=str(lawyer.id), event_type="MESSAGE", ) result = notify_request_event( db, request=req, event_type=EVENT_REQUEST_DATA, actor_role="CLIENT", body="Клиент обновил данные", responsible="Клиент", send_telegram=False, ) db.commit() self.assertEqual(int(result.get("internal_created") or 0), 1) rows = db.query(Notification).filter(Notification.event_type == "REQUEST_DATA").all() self.assertEqual(len(rows), 1) self.assertEqual(str(rows[0].recipient_admin_user_id), str(lawyer.id)) def test_client_event_stops_notifying_old_lawyer_after_reassign(self): with self.SessionLocal() as db: old_lawyer = AdminUser( role="LAWYER", name="Юрист Старый Участник", email="lawyer.old.participant@example.com", password_hash="hash", is_active=True, ) new_lawyer = AdminUser( role="LAWYER", name="Юрист Новый Назначенный", email="lawyer.new.assigned@example.com", password_hash="hash", is_active=True, ) db.add_all([old_lawyer, new_lawyer]) db.flush() req = Request( track_number="TRK-NOTIF-PARTICIPANT-REASSIGN", client_name="Клиент", client_phone="+79990000034", topic_code="civil", status_code="IN_PROGRESS", description="participant reassignment notification", extra_fields={}, assigned_lawyer_id=None, ) db.add(req) db.commit() create_admin_or_lawyer_message( db, request=req, body="Сообщение старого юриста", actor_role="LAWYER", actor_name="Юрист Старый Участник", actor_admin_user_id=str(old_lawyer.id), event_type="MESSAGE", ) req.assigned_lawyer_id = str(new_lawyer.id) db.add(req) db.commit() result = notify_request_event( db, request=req, event_type=EVENT_REQUEST_DATA, actor_role="CLIENT", body="Клиент обновил данные", responsible="Клиент", send_telegram=False, ) db.commit() self.assertEqual(int(result.get("internal_created") or 0), 1) rows = db.query(Notification).filter(Notification.event_type == "REQUEST_DATA").all() self.assertEqual(len(rows), 1) self.assertEqual(str(rows[0].recipient_admin_user_id), str(new_lawyer.id)) class NotificationSlaTests(unittest.TestCase): @classmethod def setUpClass(cls): cls.engine = create_engine( "sqlite+pysqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) cls.SessionLocal = sessionmaker(bind=cls.engine, autocommit=False, autoflush=False) AdminUser.__table__.create(bind=cls.engine) Request.__table__.create(bind=cls.engine) Status.__table__.create(bind=cls.engine) Message.__table__.create(bind=cls.engine) TopicStatusTransition.__table__.create(bind=cls.engine) StatusHistory.__table__.create(bind=cls.engine) Notification.__table__.create(bind=cls.engine) cls._old_sla_session_local = sla_task.SessionLocal sla_task.SessionLocal = cls.SessionLocal @classmethod def tearDownClass(cls): sla_task.SessionLocal = cls._old_sla_session_local Notification.__table__.drop(bind=cls.engine) StatusHistory.__table__.drop(bind=cls.engine) TopicStatusTransition.__table__.drop(bind=cls.engine) Message.__table__.drop(bind=cls.engine) Status.__table__.drop(bind=cls.engine) Request.__table__.drop(bind=cls.engine) AdminUser.__table__.drop(bind=cls.engine) cls.engine.dispose() def setUp(self): with self.SessionLocal() as db: db.execute(delete(Notification)) db.execute(delete(StatusHistory)) db.execute(delete(TopicStatusTransition)) db.execute(delete(Message)) db.execute(delete(Status)) db.execute(delete(Request)) db.execute(delete(AdminUser)) db.commit() def test_sla_overdue_notifications_are_deduplicated(self): now = datetime.now(timezone.utc) with self.SessionLocal() as db: admin = AdminUser( role="ADMIN", name="Админ", email="root@example.com", password_hash="hash", is_active=True, ) lawyer = AdminUser( role="LAWYER", name="Юрист", email="lawyer-sla@example.com", password_hash="hash", is_active=True, ) db.add_all([admin, lawyer]) db.flush() db.add(Status(code="NEW", name="Новая", enabled=True, sort_order=0, is_terminal=False)) db.add(Status(code="IN_PROGRESS", name="В работе", enabled=True, sort_order=1, is_terminal=False)) db.add( TopicStatusTransition( topic_code="civil", from_status="NEW", to_status="IN_PROGRESS", enabled=True, sla_hours=1, sort_order=1, ) ) req = Request( track_number="TRK-NOTIF-SLA", client_name="Клиент", client_phone="+79990000009", topic_code="civil", status_code="NEW", description="sla", extra_fields={}, assigned_lawyer_id=str(lawyer.id), created_at=now - timedelta(hours=2), updated_at=now - timedelta(hours=2), ) db.add(req) db.commit() first = sla_task.sla_check() second = sla_task.sla_check() self.assertGreaterEqual(first.get("notifications_created", 0), 2) self.assertEqual(second.get("notifications_created", 0), 0) with self.SessionLocal() as db: rows = db.query(Notification).filter(Notification.event_type == "SLA_OVERDUE").all() self.assertGreaterEqual(len(rows), 2)