Law/tests/admin/test_assignment_users.py
2026-03-02 20:54:09 +03:00

429 lines
18 KiB
Python

from tests.admin.base import * # noqa: F401,F403
class AdminAssignmentAndUsersTests(AdminUniversalCrudBase):
def test_lawyer_can_claim_unassigned_request_and_takeover_is_forbidden(self):
with self.SessionLocal() as db:
lawyer1 = AdminUser(
role="LAWYER",
name="Юрист 1",
email="lawyer1@example.com",
password_hash="hash",
is_active=True,
)
lawyer2 = AdminUser(
role="LAWYER",
name="Юрист 2",
email="lawyer2@example.com",
password_hash="hash",
is_active=True,
)
request_row = Request(
track_number="TRK-CLAIM-1",
client_name="Клиент",
client_phone="+79991112233",
status_code="NEW",
description="claim test",
extra_fields={},
assigned_lawyer_id=None,
)
db.add_all([lawyer1, lawyer2, request_row])
db.commit()
lawyer1_id = str(lawyer1.id)
lawyer2_id = str(lawyer2.id)
request_id = str(request_row.id)
headers1 = self._auth_headers("LAWYER", email="lawyer1@example.com", sub=lawyer1_id)
headers2 = self._auth_headers("LAWYER", email="lawyer2@example.com", sub=lawyer2_id)
admin_headers = self._auth_headers("ADMIN", email="root@example.com")
first = self.client.post(f"/api/admin/requests/{request_id}/claim", headers=headers1)
self.assertEqual(first.status_code, 200)
self.assertEqual(first.json()["assigned_lawyer_id"], lawyer1_id)
second = self.client.post(f"/api/admin/requests/{request_id}/claim", headers=headers2)
self.assertEqual(second.status_code, 409)
admin_forbidden = self.client.post(f"/api/admin/requests/{request_id}/claim", headers=admin_headers)
self.assertEqual(admin_forbidden.status_code, 403)
with self.SessionLocal() as db:
row = db.get(Request, UUID(request_id))
self.assertIsNotNone(row)
self.assertEqual(row.assigned_lawyer_id, lawyer1_id)
claim_audits = db.query(AuditLog).filter(AuditLog.entity == "requests", AuditLog.entity_id == request_id, AuditLog.action == "MANUAL_CLAIM").all()
self.assertEqual(len(claim_audits), 1)
def test_lawyer_cannot_assign_request_via_universal_crud(self):
with self.SessionLocal() as db:
lawyer = AdminUser(
role="LAWYER",
name="Юрист",
email="lawyer-assign@example.com",
password_hash="hash",
is_active=True,
)
request_row = Request(
track_number="TRK-CLAIM-2",
client_name="Клиент",
client_phone="+79994445566",
status_code="NEW",
description="crud assign block",
extra_fields={},
assigned_lawyer_id=None,
)
db.add_all([lawyer, request_row])
db.commit()
lawyer_id = str(lawyer.id)
request_id = str(request_row.id)
headers = self._auth_headers("LAWYER", email="lawyer-assign@example.com", sub=lawyer_id)
blocked_update = self.client.patch(
f"/api/admin/crud/requests/{request_id}",
headers=headers,
json={"assigned_lawyer_id": lawyer_id},
)
self.assertEqual(blocked_update.status_code, 403)
blocked_create = self.client.post(
"/api/admin/crud/requests",
headers=headers,
json={
"client_name": "Новый клиент",
"client_phone": "+79990001122",
"status_code": "NEW",
"description": "blocked create assign",
"assigned_lawyer_id": lawyer_id,
},
)
self.assertEqual(blocked_create.status_code, 403)
blocked_update_legacy = self.client.patch(
f"/api/admin/requests/{request_id}",
headers=headers,
json={"assigned_lawyer_id": lawyer_id},
)
self.assertEqual(blocked_update_legacy.status_code, 403)
blocked_create_legacy = self.client.post(
"/api/admin/requests",
headers=headers,
json={
"client_name": "Legacy клиент",
"client_phone": "+79990001123",
"status_code": "NEW",
"description": "legacy assign block",
"assigned_lawyer_id": lawyer_id,
},
)
self.assertEqual(blocked_create_legacy.status_code, 403)
def test_admin_can_reassign_assigned_request(self):
with self.SessionLocal() as db:
lawyer_from = AdminUser(
role="LAWYER",
name="Юрист Исходный",
email="lawyer-from@example.com",
password_hash="hash",
is_active=True,
)
lawyer_to = AdminUser(
role="LAWYER",
name="Юрист Целевой",
email="lawyer-to@example.com",
password_hash="hash",
is_active=True,
)
request_row = Request(
track_number="TRK-REASSIGN-1",
client_name="Клиент",
client_phone="+79993334455",
status_code="NEW",
description="reassign test",
extra_fields={},
assigned_lawyer_id=None,
)
db.add_all([lawyer_from, lawyer_to, request_row])
db.commit()
lawyer_from_id = str(lawyer_from.id)
lawyer_to_id = str(lawyer_to.id)
request_id = str(request_row.id)
claim_headers = self._auth_headers("LAWYER", email="lawyer-from@example.com", sub=lawyer_from_id)
claimed = self.client.post(f"/api/admin/requests/{request_id}/claim", headers=claim_headers)
self.assertEqual(claimed.status_code, 200)
admin_headers = self._auth_headers("ADMIN", email="root@example.com")
reassigned = self.client.post(
f"/api/admin/requests/{request_id}/reassign",
headers=admin_headers,
json={"lawyer_id": lawyer_to_id},
)
self.assertEqual(reassigned.status_code, 200)
body = reassigned.json()
self.assertEqual(body["from_lawyer_id"], lawyer_from_id)
self.assertEqual(body["assigned_lawyer_id"], lawyer_to_id)
with self.SessionLocal() as db:
row = db.get(Request, UUID(request_id))
self.assertIsNotNone(row)
self.assertEqual(row.assigned_lawyer_id, lawyer_to_id)
events = db.query(AuditLog).filter(AuditLog.entity == "requests", AuditLog.entity_id == request_id).all()
actions = [event.action for event in events]
self.assertIn("MANUAL_REASSIGN", actions)
def test_reassign_is_admin_only_and_validates_request_state(self):
with self.SessionLocal() as db:
lawyer1 = AdminUser(
role="LAWYER",
name="Юрист Один",
email="lawyer-one@example.com",
password_hash="hash",
is_active=True,
)
lawyer2 = AdminUser(
role="LAWYER",
name="Юрист Два",
email="lawyer-two@example.com",
password_hash="hash",
is_active=True,
)
db.add_all([lawyer1, lawyer2])
db.flush()
lawyer1_id = str(lawyer1.id)
lawyer2_id = str(lawyer2.id)
request_unassigned = Request(
track_number="TRK-REASSIGN-2",
client_name="Клиент",
client_phone="+79995556677",
status_code="NEW",
description="reassign invalid",
extra_fields={},
assigned_lawyer_id=None,
)
request_assigned = Request(
track_number="TRK-REASSIGN-3",
client_name="Клиент",
client_phone="+79995556678",
status_code="NEW",
description="reassign invalid same",
extra_fields={},
assigned_lawyer_id=lawyer1_id,
)
db.add_all([request_unassigned, request_assigned])
db.commit()
unassigned_id = str(request_unassigned.id)
assigned_id = str(request_assigned.id)
admin_headers = self._auth_headers("ADMIN", email="root@example.com")
lawyer_headers = self._auth_headers("LAWYER", email="lawyer-one@example.com", sub=lawyer1_id)
lawyer_forbidden = self.client.post(
f"/api/admin/requests/{assigned_id}/reassign",
headers=lawyer_headers,
json={"lawyer_id": lawyer2_id},
)
self.assertEqual(lawyer_forbidden.status_code, 403)
unassigned_blocked = self.client.post(
f"/api/admin/requests/{unassigned_id}/reassign",
headers=admin_headers,
json={"lawyer_id": lawyer2_id},
)
self.assertEqual(unassigned_blocked.status_code, 400)
same_lawyer_blocked = self.client.post(
f"/api/admin/requests/{assigned_id}/reassign",
headers=admin_headers,
json={"lawyer_id": lawyer1_id},
)
self.assertEqual(same_lawyer_blocked.status_code, 400)
def test_responsible_is_protected_from_manual_input(self):
headers = self._auth_headers("ADMIN")
response = self.client.post(
"/api/admin/crud/quotes",
headers=headers,
json={"author": "A", "text": "B", "responsible": "hacker@example.com"},
)
self.assertEqual(response.status_code, 400)
self.assertIn("Неизвестные поля", response.json().get("detail", ""))
def test_calculated_fields_are_read_only_for_universal_crud(self):
headers = self._auth_headers("ADMIN", email="root@example.com")
blocked_create = self.client.post(
"/api/admin/crud/requests",
headers=headers,
json={
"client_name": "Клиент readonly",
"client_phone": "+79995550011",
"status_code": "NEW",
"description": "calc readonly",
"invoice_amount": 12500,
},
)
self.assertEqual(blocked_create.status_code, 400)
self.assertIn("Неизвестные поля", blocked_create.json().get("detail", ""))
created = self.client.post(
"/api/admin/crud/requests",
headers=headers,
json={
"client_name": "Клиент readonly",
"client_phone": "+79995550012",
"status_code": "NEW",
"description": "valid create",
},
)
self.assertEqual(created.status_code, 201)
request_id = created.json()["id"]
blocked_patch = self.client.patch(
f"/api/admin/crud/requests/{request_id}",
headers=headers,
json={"paid_at": "2026-02-24T12:00:00+03:00"},
)
self.assertEqual(blocked_patch.status_code, 400)
self.assertIn("Неизвестные поля", blocked_patch.json().get("detail", ""))
meta_response = self.client.get("/api/admin/crud/meta/tables", headers=headers)
self.assertEqual(meta_response.status_code, 200)
by_table = {row["table"]: row for row in (meta_response.json().get("tables") or [])}
request_columns = {col["name"]: col for col in (by_table.get("requests", {}).get("columns") or [])}
self.assertIn("invoice_amount", request_columns)
self.assertIn("paid_at", request_columns)
self.assertIn("paid_by_admin_id", request_columns)
self.assertIn("total_attachments_bytes", request_columns)
self.assertFalse(request_columns["invoice_amount"]["editable"])
self.assertFalse(request_columns["paid_at"]["editable"])
self.assertFalse(request_columns["paid_by_admin_id"]["editable"])
self.assertFalse(request_columns["total_attachments_bytes"]["editable"])
invoice_columns = {col["name"]: col for col in (by_table.get("invoices", {}).get("columns") or [])}
self.assertIn("issued_at", invoice_columns)
self.assertIn("paid_at", invoice_columns)
self.assertFalse(invoice_columns["issued_at"]["editable"])
self.assertFalse(invoice_columns["paid_at"]["editable"])
def test_topic_code_is_autogenerated_when_missing(self):
headers = self._auth_headers("ADMIN")
first = self.client.post(
"/api/admin/crud/topics",
headers=headers,
json={"name": "Семейное право"},
)
self.assertEqual(first.status_code, 201)
body1 = first.json()
self.assertTrue(body1.get("code"))
self.assertRegex(body1["code"], r"^[a-z0-9-]+$")
second = self.client.post(
"/api/admin/crud/topics",
headers=headers,
json={"name": "Семейное право"},
)
self.assertEqual(second.status_code, 201)
body2 = second.json()
self.assertTrue(body2.get("code"))
self.assertRegex(body2["code"], r"^[a-z0-9-]+$")
self.assertNotEqual(body1["code"], body2["code"])
def test_topic_sort_order_is_assigned_as_next_max_on_create(self):
headers = self._auth_headers("ADMIN")
first = self.client.post(
"/api/admin/crud/topics",
headers=headers,
json={"name": "Первая тема", "sort_order": 999},
)
self.assertEqual(first.status_code, 201)
self.assertEqual(int(first.json().get("sort_order") or 0), 1)
second = self.client.post(
"/api/admin/crud/topics",
headers=headers,
json={"name": "Вторая тема"},
)
self.assertEqual(second.status_code, 201)
self.assertEqual(int(second.json().get("sort_order") or 0), 2)
def test_admin_can_manage_users_with_password_hashing(self):
headers = self._auth_headers("ADMIN", email="root@example.com")
topic_create = self.client.post(
"/api/admin/crud/topics",
headers=headers,
json={"code": "civil-law", "name": "Гражданское право"},
)
self.assertEqual(topic_create.status_code, 201)
created = self.client.post(
"/api/admin/crud/admin_users",
headers=headers,
json={
"name": "Юрист Тестовый",
"email": "Lawyer.TEST@Example.com",
"role": "LAWYER",
"primary_topic_code": "civil-law",
"avatar_url": "https://cdn.example.com/avatars/lawyer-test.png",
"password": "StartPass-123",
"is_active": True,
},
)
self.assertEqual(created.status_code, 201)
body = created.json()
self.assertEqual(body["email"], "lawyer.test@example.com")
self.assertEqual(body["role"], "LAWYER")
self.assertEqual(body["avatar_url"], "https://cdn.example.com/avatars/lawyer-test.png")
self.assertEqual(body["primary_topic_code"], "civil-law")
self.assertNotIn("password_hash", body)
user_id = body["id"]
UUID(user_id)
with self.SessionLocal() as db:
user = db.get(AdminUser, UUID(user_id))
self.assertIsNotNone(user)
self.assertTrue(verify_password("StartPass-123", user.password_hash))
updated = self.client.patch(
f"/api/admin/crud/admin_users/{user_id}",
headers=headers,
json={"role": "ADMIN", "password": "UpdatedPass-999", "is_active": False, "primary_topic_code": "", "avatar_url": ""},
)
self.assertEqual(updated.status_code, 200)
upd_body = updated.json()
self.assertEqual(upd_body["role"], "ADMIN")
self.assertIsNone(upd_body["avatar_url"])
self.assertIsNone(upd_body["primary_topic_code"])
self.assertFalse(upd_body["is_active"])
self.assertNotIn("password_hash", upd_body)
with self.SessionLocal() as db:
user = db.get(AdminUser, UUID(user_id))
self.assertIsNotNone(user)
self.assertTrue(verify_password("UpdatedPass-999", user.password_hash))
self.assertFalse(verify_password("StartPass-123", user.password_hash))
q = self.client.post(
"/api/admin/crud/admin_users/query",
headers=headers,
json={"filters": [], "sort": [{"field": "created_at", "dir": "desc"}], "page": {"limit": 50, "offset": 0}},
)
self.assertEqual(q.status_code, 200)
self.assertGreaterEqual(q.json()["total"], 1)
self.assertNotIn("password_hash", q.json()["rows"][0])
blocked_hash_write = self.client.patch(
f"/api/admin/crud/admin_users/{user_id}",
headers=headers,
json={"password_hash": "forged"},
)
self.assertEqual(blocked_hash_write.status_code, 400)
self_headers = self._auth_headers("ADMIN", email="self@example.com", sub=user_id)
self_delete = self.client.delete(f"/api/admin/crud/admin_users/{user_id}", headers=self_headers)
self.assertEqual(self_delete.status_code, 400)
deleted = self.client.delete(f"/api/admin/crud/admin_users/{user_id}", headers=headers)
self.assertEqual(deleted.status_code, 200)