Law/app/api/admin/crud_modules/payloads.py
2026-03-02 20:54:09 +03:00

641 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import uuid
from typing import Any
from fastapi import HTTPException
from sqlalchemy import func
from sqlalchemy.inspection import inspect as sa_inspect
from sqlalchemy.orm import Session
from app.core.security import hash_password
from app.models.admin_user import AdminUser
from app.models.client import Client
from app.models.form_field import FormField
from app.models.request import Request
from app.models.request_data_requirement import RequestDataRequirement
from app.models.request_data_template import RequestDataTemplate
from app.models.request_data_template_item import RequestDataTemplateItem
from app.models.status import Status
from app.models.status_group import StatusGroup
from app.models.topic import Topic
from app.models.topic_data_template import TopicDataTemplate
from app.services.billing_flow import normalize_status_kind_or_400
from .access import ALLOWED_ADMIN_ROLES, ALLOWED_REQUEST_DATA_VALUE_TYPES
from .meta import SYSTEM_FIELDS, _columns_map, _protected_input_fields
def _sanitize_payload(
model: type,
table_name: str,
payload: dict[str, Any],
*,
is_update: bool,
allow_protected_fields: set[str] | None = None,
) -> dict[str, Any]:
if not isinstance(payload, dict):
raise HTTPException(status_code=400, detail="Тело запроса должно быть JSON-объектом")
columns = _columns_map(model)
allowed_hidden = set(allow_protected_fields or set())
mutable_columns = {
name
for name in columns.keys()
if name not in SYSTEM_FIELDS and (name not in _protected_input_fields(table_name) or name in allowed_hidden)
}
unknown_fields = sorted(set(payload.keys()) - mutable_columns)
if unknown_fields:
raise HTTPException(status_code=400, detail="Неизвестные поля: " + ", ".join(unknown_fields))
cleaned: dict[str, Any] = {}
for key, value in payload.items():
column = columns[key]
if value is None and not column.nullable:
raise HTTPException(status_code=400, detail=f'Поле "{key}" не может быть null')
cleaned[key] = value
if is_update:
if not cleaned:
raise HTTPException(status_code=400, detail="Нет полей для обновления")
return cleaned
required_missing: list[str] = []
for name, column in columns.items():
if name in SYSTEM_FIELDS:
continue
if column.nullable:
continue
if column.default is not None or column.server_default is not None:
continue
if name not in cleaned:
required_missing.append(name)
if required_missing:
raise HTTPException(status_code=400, detail="Отсутствуют обязательные поля: " + ", ".join(sorted(required_missing)))
return cleaned
def _pk_value(model: type, row_id: str) -> Any:
pk = sa_inspect(model).primary_key
if len(pk) != 1:
raise HTTPException(status_code=400, detail="Поддерживаются только таблицы с одним первичным ключом")
pk_column = pk[0]
try:
python_type = pk_column.type.python_type
except Exception:
python_type = str
if python_type is uuid.UUID:
try:
return uuid.UUID(str(row_id))
except ValueError:
raise HTTPException(status_code=400, detail="Некорректный идентификатор")
return row_id
def _load_row_or_404(db: Session, model: type, row_id: str):
entity = db.get(model, _pk_value(model, row_id))
if entity is None:
raise HTTPException(status_code=404, detail="Запись не найдена")
return entity
def _prepare_create_payload(table_name: str, payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload)
if table_name == "requests":
track_number = str(data.get("track_number") or "").strip()
data["track_number"] = track_number or f"TRK-{uuid.uuid4().hex[:10].upper()}"
if data.get("extra_fields") is None:
data["extra_fields"] = {}
return data
def _normalize_optional_string(value: Any) -> str | None:
text = str(value or "").strip()
return text or None
def _normalize_client_phone(value: Any) -> str:
text = str(value or "").strip()
if not text:
return ""
allowed = {"+", "(", ")", "-", " "}
return "".join(ch for ch in text if ch.isdigit() or ch in allowed).strip()
def _upsert_client_or_400(db: Session, *, full_name: Any, phone: Any, responsible: str) -> Client:
normalized_phone = _normalize_client_phone(phone)
if not normalized_phone:
raise HTTPException(status_code=400, detail='Поле "client_phone" обязательно')
normalized_name = str(full_name or "").strip() or "Клиент"
row = db.query(Client).filter(Client.phone == normalized_phone).first()
if row is None:
row = Client(
full_name=normalized_name,
phone=normalized_phone,
responsible=responsible or "Администратор системы",
)
db.add(row)
db.flush()
return row
changed = False
if normalized_name and row.full_name != normalized_name:
row.full_name = normalized_name
changed = True
if responsible and row.responsible != responsible:
row.responsible = responsible
changed = True
if changed:
db.add(row)
db.flush()
return row
def _request_for_uuid_or_400(db: Session, raw_request_id: Any) -> Request:
request_uuid = _parse_uuid_or_400(raw_request_id, "request_id")
req = db.get(Request, request_uuid)
if req is None:
raise HTTPException(status_code=400, detail="Заявка не найдена")
return req
def _active_lawyer_or_400(db: Session, lawyer_id: Any) -> AdminUser:
lawyer_uuid = _parse_uuid_or_400(lawyer_id, "assigned_lawyer_id")
lawyer = db.get(AdminUser, lawyer_uuid)
if lawyer is None or str(lawyer.role or "").upper() != "LAWYER" or not bool(lawyer.is_active):
raise HTTPException(status_code=400, detail="Можно назначить только активного юриста")
return lawyer
def _apply_admin_user_fields_for_create(payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload)
if "password_hash" in data:
raise HTTPException(status_code=400, detail='Поле "password_hash" недоступно для записи')
raw_password = str(data.pop("password", "")).strip()
if not raw_password:
raise HTTPException(status_code=400, detail="Пароль обязателен")
role = str(data.get("role") or "").strip().upper()
if role not in ALLOWED_ADMIN_ROLES:
raise HTTPException(status_code=400, detail="Некорректная роль")
email = str(data.get("email") or "").strip().lower()
if not email:
raise HTTPException(status_code=400, detail="Email обязателен")
data["email"] = email
data["role"] = role
if "phone" in data:
data["phone"] = _normalize_optional_string(_normalize_client_phone(data.get("phone")))
data["avatar_url"] = _normalize_optional_string(data.get("avatar_url"))
data["primary_topic_code"] = _normalize_optional_string(data.get("primary_topic_code"))
data["password_hash"] = hash_password(raw_password)
return data
def _apply_admin_user_fields_for_update(payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload)
if "password_hash" in data:
raise HTTPException(status_code=400, detail='Поле "password_hash" недоступно для записи')
if "password" in data:
raw_password = str(data.pop("password") or "").strip()
if not raw_password:
raise HTTPException(status_code=400, detail="Пароль не может быть пустым")
data["password_hash"] = hash_password(raw_password)
if "role" in data:
role = str(data.get("role") or "").strip().upper()
if role not in ALLOWED_ADMIN_ROLES:
raise HTTPException(status_code=400, detail="Некорректная роль")
data["role"] = role
if "email" in data:
email = str(data.get("email") or "").strip().lower()
if not email:
raise HTTPException(status_code=400, detail="Email не может быть пустым")
data["email"] = email
if "phone" in data:
data["phone"] = _normalize_optional_string(_normalize_client_phone(data.get("phone")))
if "avatar_url" in data:
data["avatar_url"] = _normalize_optional_string(data.get("avatar_url"))
if "primary_topic_code" in data:
data["primary_topic_code"] = _normalize_optional_string(data.get("primary_topic_code"))
return data
def _parse_uuid_or_400(value: Any, field_name: str) -> uuid.UUID:
try:
return uuid.UUID(str(value))
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail=f'Поле "{field_name}" должно быть UUID')
def _apply_admin_user_topics_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload)
if "admin_user_id" in data:
user_id = _parse_uuid_or_400(data.get("admin_user_id"), "admin_user_id")
user = db.get(AdminUser, user_id)
if user is None:
raise HTTPException(status_code=400, detail="Пользователь не найден")
if str(user.role or "").upper() != "LAWYER":
raise HTTPException(status_code=400, detail="Дополнительные темы доступны только для юриста")
data["admin_user_id"] = user_id
if "topic_code" in data:
topic_code = str(data.get("topic_code") or "").strip()
if not topic_code:
raise HTTPException(status_code=400, detail='Поле "topic_code" не может быть пустым')
topic_exists = db.query(Topic.id).filter(Topic.code == topic_code).first()
if topic_exists is None:
raise HTTPException(status_code=400, detail="Тема не найдена")
data["topic_code"] = topic_code
return data
def _ensure_topic_exists_or_400(db: Session, topic_code: str) -> None:
exists = db.query(Topic.id).filter(Topic.code == topic_code).first()
if exists is None:
raise HTTPException(status_code=400, detail="Тема не найдена")
def _ensure_form_field_exists_or_400(db: Session, field_key: str) -> None:
exists = db.query(FormField.id).filter(FormField.key == field_key).first()
if exists is None:
raise HTTPException(status_code=400, detail="Поле формы не найдено")
def _ensure_status_exists_or_400(db: Session, status_code: str) -> None:
exists = db.query(Status.id).filter(Status.code == status_code).first()
if exists is None:
raise HTTPException(status_code=400, detail="Статус не найден")
def _as_positive_int_or_400(value: Any, field_name: str) -> int:
try:
number = int(value)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail=f'Поле "{field_name}" должно быть целым числом')
if number <= 0:
raise HTTPException(status_code=400, detail=f'Поле "{field_name}" должно быть больше 0')
return number
def _normalize_string_list_or_400(value: Any, field_name: str) -> list[str] | None:
if value is None:
return None
source = value
if isinstance(source, str):
text = source.strip()
if not text:
return None
if text.startswith("["):
try:
source = json.loads(text)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail=f'Поле "{field_name}" должно быть JSON-массивом строк')
else:
source = [chunk.strip() for chunk in text.replace("\n", ",").split(",")]
if not isinstance(source, (list, tuple, set)):
raise HTTPException(status_code=400, detail=f'Поле "{field_name}" должно быть массивом строк')
out: list[str] = []
seen: set[str] = set()
for item in source:
text = str(item or "").strip()
if not text:
continue
lowered = text.lower()
if lowered in seen:
continue
seen.add(lowered)
out.append(text)
return out
def _apply_topic_required_fields_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload)
if "topic_code" in data:
topic_code = str(data.get("topic_code") or "").strip()
if not topic_code:
raise HTTPException(status_code=400, detail='Поле "topic_code" не может быть пустым')
_ensure_topic_exists_or_400(db, topic_code)
data["topic_code"] = topic_code
if "field_key" in data:
field_key = str(data.get("field_key") or "").strip()
if not field_key:
raise HTTPException(status_code=400, detail='Поле "field_key" не может быть пустым')
_ensure_form_field_exists_or_400(db, field_key)
data["field_key"] = field_key
return data
def _apply_topic_data_templates_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload)
if "topic_code" in data:
topic_code = str(data.get("topic_code") or "").strip()
if not topic_code:
raise HTTPException(status_code=400, detail='Поле "topic_code" не может быть пустым')
_ensure_topic_exists_or_400(db, topic_code)
data["topic_code"] = topic_code
if "key" in data:
key = str(data.get("key") or "").strip()
if not key:
raise HTTPException(status_code=400, detail='Поле "key" не может быть пустым')
data["key"] = key
if "value_type" in data:
value_type = str(data.get("value_type") or "").strip().lower()
if value_type not in ALLOWED_REQUEST_DATA_VALUE_TYPES:
raise HTTPException(status_code=400, detail='Поле "value_type" должно быть одним из: string, text, date, number, file')
data["value_type"] = value_type
if "document_name" in data:
data["document_name"] = _normalize_optional_string(data.get("document_name"))
return data
def _apply_request_data_templates_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload)
if "topic_code" in data:
topic_code = str(data.get("topic_code") or "").strip()
if not topic_code:
raise HTTPException(status_code=400, detail='Поле "topic_code" не может быть пустым')
_ensure_topic_exists_or_400(db, topic_code)
data["topic_code"] = topic_code
if "name" in data:
name = str(data.get("name") or "").strip()
if not name:
raise HTTPException(status_code=400, detail='Поле "name" не может быть пустым')
data["name"] = name
if "description" in data:
data["description"] = _normalize_optional_string(data.get("description"))
if "created_by_admin_id" in data and data.get("created_by_admin_id") is not None:
admin_id = _parse_uuid_or_400(data.get("created_by_admin_id"), "created_by_admin_id")
admin_user = db.get(AdminUser, admin_id)
if admin_user is None:
raise HTTPException(status_code=400, detail="Пользователь не найден")
data["created_by_admin_id"] = admin_id
return data
def _apply_request_data_template_items_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload)
template = None
if "request_data_template_id" in data:
template_id = _parse_uuid_or_400(data.get("request_data_template_id"), "request_data_template_id")
template = db.get(RequestDataTemplate, template_id)
if template is None:
raise HTTPException(status_code=400, detail="Шаблон не найден")
data["request_data_template_id"] = template_id
if "topic_data_template_id" in data and data.get("topic_data_template_id") is not None:
catalog_id = _parse_uuid_or_400(data.get("topic_data_template_id"), "topic_data_template_id")
catalog = db.get(TopicDataTemplate, catalog_id)
if catalog is None:
raise HTTPException(status_code=400, detail="Поле доп. данных не найдено")
data["topic_data_template_id"] = catalog_id
if "key" not in data or not str(data.get("key") or "").strip():
data["key"] = str(catalog.key or "").strip()
if "label" not in data or not str(data.get("label") or "").strip():
data["label"] = str(catalog.label or catalog.key or "").strip()
if "value_type" not in data or not str(data.get("value_type") or "").strip():
data["value_type"] = str(catalog.value_type or "string")
if template is not None and str(template.topic_code or "").strip() and str(catalog.topic_code or "").strip():
if str(template.topic_code) != str(catalog.topic_code):
raise HTTPException(status_code=400, detail="Поле не соответствует теме шаблона")
if "key" in data:
key = str(data.get("key") or "").strip()
if not key:
raise HTTPException(status_code=400, detail='Поле "key" не может быть пустым')
data["key"] = key[:80]
if "label" in data:
label = str(data.get("label") or "").strip()
if not label:
raise HTTPException(status_code=400, detail='Поле "label" не может быть пустым')
data["label"] = label
if "value_type" in data:
value_type = str(data.get("value_type") or "").strip().lower()
if value_type not in ALLOWED_REQUEST_DATA_VALUE_TYPES:
raise HTTPException(status_code=400, detail='Поле "value_type" должно быть одним из: string, text, date, number, file')
data["value_type"] = value_type
if "sort_order" in data:
raw = data.get("sort_order")
if raw is None or str(raw).strip() == "":
data["sort_order"] = 0
else:
try:
data["sort_order"] = int(raw)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail='Поле "sort_order" должно быть целым числом')
return data
def _apply_request_data_requirements_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload)
if "request_id" in data:
request_id = _parse_uuid_or_400(data.get("request_id"), "request_id")
request = db.get(Request, request_id)
if request is None:
raise HTTPException(status_code=400, detail="Заявка не найдена")
data["request_id"] = request_id
if "topic_template_id" in data and data.get("topic_template_id") is not None:
template_id = _parse_uuid_or_400(data.get("topic_template_id"), "topic_template_id")
template = db.get(TopicDataTemplate, template_id)
if template is None:
raise HTTPException(status_code=400, detail="Шаблон темы не найден")
data["topic_template_id"] = template_id
if "request_message_id" in data and data.get("request_message_id") is not None:
data["request_message_id"] = _parse_uuid_or_400(data.get("request_message_id"), "request_message_id")
if "key" in data:
key = str(data.get("key") or "").strip()
if not key:
raise HTTPException(status_code=400, detail='Поле "key" не может быть пустым')
data["key"] = key
if "field_type" in data:
field_type = str(data.get("field_type") or "").strip().lower()
if field_type not in ALLOWED_REQUEST_DATA_VALUE_TYPES:
raise HTTPException(status_code=400, detail='Поле "field_type" должно быть одним из: string, text, date, number, file')
data["field_type"] = field_type
if "document_name" in data:
data["document_name"] = _normalize_optional_string(data.get("document_name"))
if "value_text" in data:
data["value_text"] = _normalize_optional_string(data.get("value_text"))
if "sort_order" in data:
raw_sort = data.get("sort_order")
if raw_sort is None or str(raw_sort).strip() == "":
data["sort_order"] = 0
else:
try:
data["sort_order"] = int(raw_sort)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail='Поле "sort_order" должно быть целым числом')
return data
def _apply_topic_status_transitions_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload)
topic_code = None
from_status = None
to_status = None
if "topic_code" in data:
topic_code = str(data.get("topic_code") or "").strip()
if not topic_code:
raise HTTPException(status_code=400, detail='Поле "topic_code" не может быть пустым')
_ensure_topic_exists_or_400(db, topic_code)
data["topic_code"] = topic_code
if "from_status" in data:
from_status = str(data.get("from_status") or "").strip()
if not from_status:
raise HTTPException(status_code=400, detail='Поле "from_status" не может быть пустым')
_ensure_status_exists_or_400(db, from_status)
data["from_status"] = from_status
if "to_status" in data:
to_status = str(data.get("to_status") or "").strip()
if not to_status:
raise HTTPException(status_code=400, detail='Поле "to_status" не может быть пустым')
_ensure_status_exists_or_400(db, to_status)
data["to_status"] = to_status
if from_status and to_status and from_status == to_status:
raise HTTPException(status_code=400, detail='Поля "from_status" и "to_status" не должны совпадать')
if "sla_hours" in data:
raw = data.get("sla_hours")
if raw is None or str(raw).strip() == "":
data["sla_hours"] = None
else:
data["sla_hours"] = _as_positive_int_or_400(raw, "sla_hours")
if "required_data_keys" in data:
data["required_data_keys"] = _normalize_string_list_or_400(data.get("required_data_keys"), "required_data_keys")
if "required_mime_types" in data:
data["required_mime_types"] = _normalize_string_list_or_400(data.get("required_mime_types"), "required_mime_types")
return data
def _apply_status_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload)
if "kind" in data:
data["kind"] = normalize_status_kind_or_400(data.get("kind"))
if "status_group_id" in data:
raw_group = data.get("status_group_id")
if raw_group is None or str(raw_group).strip() == "":
data["status_group_id"] = None
else:
group_id = _parse_uuid_or_400(raw_group, "status_group_id")
group = db.get(StatusGroup, group_id)
if group is None:
raise HTTPException(status_code=400, detail="Группа статусов не найдена")
data["status_group_id"] = group_id
if "invoice_template" in data:
text = str(data.get("invoice_template") or "").strip()
data["invoice_template"] = text or None
return data
_RU_TO_LATIN = {
"а": "a",
"б": "b",
"в": "v",
"г": "g",
"д": "d",
"е": "e",
"ё": "e",
"ж": "zh",
"з": "z",
"и": "i",
"й": "y",
"к": "k",
"л": "l",
"м": "m",
"н": "n",
"о": "o",
"п": "p",
"р": "r",
"с": "s",
"т": "t",
"у": "u",
"ф": "f",
"х": "h",
"ц": "ts",
"ч": "ch",
"ш": "sh",
"щ": "sch",
"ъ": "",
"ы": "y",
"ь": "",
"э": "e",
"ю": "yu",
"я": "ya",
}
def _slugify(value: str, fallback: str) -> str:
raw = str(value or "").strip().lower()
if not raw:
return fallback
latin = "".join(_RU_TO_LATIN.get(ch, ch) for ch in raw)
out: list[str] = []
prev_dash = False
for ch in latin:
if ("a" <= ch <= "z") or ("0" <= ch <= "9"):
out.append(ch)
prev_dash = False
continue
if not prev_dash:
out.append("-")
prev_dash = True
slug = "".join(out).strip("-")
return slug or fallback
def _make_unique_value(db: Session, model: type, field_name: str, base_value: str) -> str:
columns = _columns_map(model)
column = columns[field_name]
max_len = getattr(column.type, "length", None)
base = base_value.strip("-") or field_name
if max_len:
base = base[:max_len]
field = getattr(model, field_name)
if not db.query(model).filter(field == base).first():
return base
idx = 2
while True:
suffix = f"-{idx}"
candidate = base
if max_len and len(candidate) + len(suffix) > max_len:
candidate = candidate[: max_len - len(suffix)]
candidate = (candidate + suffix).strip("-")
if not db.query(model).filter(field == candidate).first():
return candidate
idx += 1
def _next_sort_order_value(db: Session, model: type) -> int:
if "sort_order" not in _columns_map(model):
return 1
column = getattr(model, "sort_order", None)
if column is None:
return 1
current_max = db.query(func.max(column)).scalar()
try:
return int(current_max or 0) + 1
except (TypeError, ValueError):
return 1
def _apply_auto_fields_for_create(db: Session, model: type, table_name: str, payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload)
if table_name == "topics" and not str(data.get("code") or "").strip():
base = _slugify(str(data.get("name") or ""), "topic")
data["code"] = _make_unique_value(db, model, "code", base)
if table_name == "topics":
data["sort_order"] = _next_sort_order_value(db, model)
if table_name == "statuses" and not str(data.get("code") or "").strip():
base = _slugify(str(data.get("name") or ""), "status")
data["code"] = _make_unique_value(db, model, "code", base)
if table_name == "form_fields" and not str(data.get("key") or "").strip():
base = _slugify(str(data.get("label") or ""), "field")
data["key"] = _make_unique_value(db, model, "key", base)
if table_name == "admin_users":
data = _apply_admin_user_fields_for_create(data)
return data