mirror of
https://github.com/TronoSfera/Law.git
synced 2026-05-18 10:03:45 +03:00
641 lines
26 KiB
Python
641 lines
26 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import uuid
|
||
from typing import Any
|
||
|
||
from fastapi import HTTPException
|
||
from sqlalchemy import func
|
||
from sqlalchemy.inspection import inspect as sa_inspect
|
||
from sqlalchemy.orm import Session
|
||
|
||
from app.core.security import hash_password
|
||
from app.models.admin_user import AdminUser
|
||
from app.models.client import Client
|
||
from app.models.form_field import FormField
|
||
from app.models.request import Request
|
||
from app.models.request_data_requirement import RequestDataRequirement
|
||
from app.models.request_data_template import RequestDataTemplate
|
||
from app.models.request_data_template_item import RequestDataTemplateItem
|
||
from app.models.status import Status
|
||
from app.models.status_group import StatusGroup
|
||
from app.models.topic import Topic
|
||
from app.models.topic_data_template import TopicDataTemplate
|
||
from app.services.billing_flow import normalize_status_kind_or_400
|
||
|
||
from .access import ALLOWED_ADMIN_ROLES, ALLOWED_REQUEST_DATA_VALUE_TYPES
|
||
from .meta import SYSTEM_FIELDS, _columns_map, _protected_input_fields
|
||
|
||
def _sanitize_payload(
|
||
model: type,
|
||
table_name: str,
|
||
payload: dict[str, Any],
|
||
*,
|
||
is_update: bool,
|
||
allow_protected_fields: set[str] | None = None,
|
||
) -> dict[str, Any]:
|
||
if not isinstance(payload, dict):
|
||
raise HTTPException(status_code=400, detail="Тело запроса должно быть JSON-объектом")
|
||
|
||
columns = _columns_map(model)
|
||
allowed_hidden = set(allow_protected_fields or set())
|
||
mutable_columns = {
|
||
name
|
||
for name in columns.keys()
|
||
if name not in SYSTEM_FIELDS and (name not in _protected_input_fields(table_name) or name in allowed_hidden)
|
||
}
|
||
|
||
unknown_fields = sorted(set(payload.keys()) - mutable_columns)
|
||
if unknown_fields:
|
||
raise HTTPException(status_code=400, detail="Неизвестные поля: " + ", ".join(unknown_fields))
|
||
|
||
cleaned: dict[str, Any] = {}
|
||
for key, value in payload.items():
|
||
column = columns[key]
|
||
if value is None and not column.nullable:
|
||
raise HTTPException(status_code=400, detail=f'Поле "{key}" не может быть null')
|
||
cleaned[key] = value
|
||
|
||
if is_update:
|
||
if not cleaned:
|
||
raise HTTPException(status_code=400, detail="Нет полей для обновления")
|
||
return cleaned
|
||
|
||
required_missing: list[str] = []
|
||
for name, column in columns.items():
|
||
if name in SYSTEM_FIELDS:
|
||
continue
|
||
if column.nullable:
|
||
continue
|
||
if column.default is not None or column.server_default is not None:
|
||
continue
|
||
if name not in cleaned:
|
||
required_missing.append(name)
|
||
if required_missing:
|
||
raise HTTPException(status_code=400, detail="Отсутствуют обязательные поля: " + ", ".join(sorted(required_missing)))
|
||
|
||
return cleaned
|
||
|
||
|
||
def _pk_value(model: type, row_id: str) -> Any:
|
||
pk = sa_inspect(model).primary_key
|
||
if len(pk) != 1:
|
||
raise HTTPException(status_code=400, detail="Поддерживаются только таблицы с одним первичным ключом")
|
||
pk_column = pk[0]
|
||
try:
|
||
python_type = pk_column.type.python_type
|
||
except Exception:
|
||
python_type = str
|
||
if python_type is uuid.UUID:
|
||
try:
|
||
return uuid.UUID(str(row_id))
|
||
except ValueError:
|
||
raise HTTPException(status_code=400, detail="Некорректный идентификатор")
|
||
return row_id
|
||
|
||
|
||
def _load_row_or_404(db: Session, model: type, row_id: str):
|
||
entity = db.get(model, _pk_value(model, row_id))
|
||
if entity is None:
|
||
raise HTTPException(status_code=404, detail="Запись не найдена")
|
||
return entity
|
||
|
||
|
||
def _prepare_create_payload(table_name: str, payload: dict[str, Any]) -> dict[str, Any]:
|
||
data = dict(payload)
|
||
if table_name == "requests":
|
||
track_number = str(data.get("track_number") or "").strip()
|
||
data["track_number"] = track_number or f"TRK-{uuid.uuid4().hex[:10].upper()}"
|
||
if data.get("extra_fields") is None:
|
||
data["extra_fields"] = {}
|
||
return data
|
||
|
||
|
||
def _normalize_optional_string(value: Any) -> str | None:
|
||
text = str(value or "").strip()
|
||
return text or None
|
||
|
||
|
||
def _normalize_client_phone(value: Any) -> str:
|
||
text = str(value or "").strip()
|
||
if not text:
|
||
return ""
|
||
allowed = {"+", "(", ")", "-", " "}
|
||
return "".join(ch for ch in text if ch.isdigit() or ch in allowed).strip()
|
||
|
||
|
||
def _upsert_client_or_400(db: Session, *, full_name: Any, phone: Any, responsible: str) -> Client:
|
||
normalized_phone = _normalize_client_phone(phone)
|
||
if not normalized_phone:
|
||
raise HTTPException(status_code=400, detail='Поле "client_phone" обязательно')
|
||
normalized_name = str(full_name or "").strip() or "Клиент"
|
||
|
||
row = db.query(Client).filter(Client.phone == normalized_phone).first()
|
||
if row is None:
|
||
row = Client(
|
||
full_name=normalized_name,
|
||
phone=normalized_phone,
|
||
responsible=responsible or "Администратор системы",
|
||
)
|
||
db.add(row)
|
||
db.flush()
|
||
return row
|
||
|
||
changed = False
|
||
if normalized_name and row.full_name != normalized_name:
|
||
row.full_name = normalized_name
|
||
changed = True
|
||
if responsible and row.responsible != responsible:
|
||
row.responsible = responsible
|
||
changed = True
|
||
if changed:
|
||
db.add(row)
|
||
db.flush()
|
||
return row
|
||
|
||
|
||
def _request_for_uuid_or_400(db: Session, raw_request_id: Any) -> Request:
|
||
request_uuid = _parse_uuid_or_400(raw_request_id, "request_id")
|
||
req = db.get(Request, request_uuid)
|
||
if req is None:
|
||
raise HTTPException(status_code=400, detail="Заявка не найдена")
|
||
return req
|
||
|
||
|
||
def _active_lawyer_or_400(db: Session, lawyer_id: Any) -> AdminUser:
|
||
lawyer_uuid = _parse_uuid_or_400(lawyer_id, "assigned_lawyer_id")
|
||
lawyer = db.get(AdminUser, lawyer_uuid)
|
||
if lawyer is None or str(lawyer.role or "").upper() != "LAWYER" or not bool(lawyer.is_active):
|
||
raise HTTPException(status_code=400, detail="Можно назначить только активного юриста")
|
||
return lawyer
|
||
|
||
|
||
def _apply_admin_user_fields_for_create(payload: dict[str, Any]) -> dict[str, Any]:
|
||
data = dict(payload)
|
||
if "password_hash" in data:
|
||
raise HTTPException(status_code=400, detail='Поле "password_hash" недоступно для записи')
|
||
raw_password = str(data.pop("password", "")).strip()
|
||
if not raw_password:
|
||
raise HTTPException(status_code=400, detail="Пароль обязателен")
|
||
role = str(data.get("role") or "").strip().upper()
|
||
if role not in ALLOWED_ADMIN_ROLES:
|
||
raise HTTPException(status_code=400, detail="Некорректная роль")
|
||
email = str(data.get("email") or "").strip().lower()
|
||
if not email:
|
||
raise HTTPException(status_code=400, detail="Email обязателен")
|
||
data["email"] = email
|
||
data["role"] = role
|
||
if "phone" in data:
|
||
data["phone"] = _normalize_optional_string(_normalize_client_phone(data.get("phone")))
|
||
data["avatar_url"] = _normalize_optional_string(data.get("avatar_url"))
|
||
data["primary_topic_code"] = _normalize_optional_string(data.get("primary_topic_code"))
|
||
data["password_hash"] = hash_password(raw_password)
|
||
return data
|
||
|
||
|
||
def _apply_admin_user_fields_for_update(payload: dict[str, Any]) -> dict[str, Any]:
|
||
data = dict(payload)
|
||
if "password_hash" in data:
|
||
raise HTTPException(status_code=400, detail='Поле "password_hash" недоступно для записи')
|
||
if "password" in data:
|
||
raw_password = str(data.pop("password") or "").strip()
|
||
if not raw_password:
|
||
raise HTTPException(status_code=400, detail="Пароль не может быть пустым")
|
||
data["password_hash"] = hash_password(raw_password)
|
||
if "role" in data:
|
||
role = str(data.get("role") or "").strip().upper()
|
||
if role not in ALLOWED_ADMIN_ROLES:
|
||
raise HTTPException(status_code=400, detail="Некорректная роль")
|
||
data["role"] = role
|
||
if "email" in data:
|
||
email = str(data.get("email") or "").strip().lower()
|
||
if not email:
|
||
raise HTTPException(status_code=400, detail="Email не может быть пустым")
|
||
data["email"] = email
|
||
if "phone" in data:
|
||
data["phone"] = _normalize_optional_string(_normalize_client_phone(data.get("phone")))
|
||
if "avatar_url" in data:
|
||
data["avatar_url"] = _normalize_optional_string(data.get("avatar_url"))
|
||
if "primary_topic_code" in data:
|
||
data["primary_topic_code"] = _normalize_optional_string(data.get("primary_topic_code"))
|
||
return data
|
||
|
||
|
||
def _parse_uuid_or_400(value: Any, field_name: str) -> uuid.UUID:
|
||
try:
|
||
return uuid.UUID(str(value))
|
||
except (TypeError, ValueError):
|
||
raise HTTPException(status_code=400, detail=f'Поле "{field_name}" должно быть UUID')
|
||
|
||
|
||
def _apply_admin_user_topics_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
|
||
data = dict(payload)
|
||
if "admin_user_id" in data:
|
||
user_id = _parse_uuid_or_400(data.get("admin_user_id"), "admin_user_id")
|
||
user = db.get(AdminUser, user_id)
|
||
if user is None:
|
||
raise HTTPException(status_code=400, detail="Пользователь не найден")
|
||
if str(user.role or "").upper() != "LAWYER":
|
||
raise HTTPException(status_code=400, detail="Дополнительные темы доступны только для юриста")
|
||
data["admin_user_id"] = user_id
|
||
if "topic_code" in data:
|
||
topic_code = str(data.get("topic_code") or "").strip()
|
||
if not topic_code:
|
||
raise HTTPException(status_code=400, detail='Поле "topic_code" не может быть пустым')
|
||
topic_exists = db.query(Topic.id).filter(Topic.code == topic_code).first()
|
||
if topic_exists is None:
|
||
raise HTTPException(status_code=400, detail="Тема не найдена")
|
||
data["topic_code"] = topic_code
|
||
return data
|
||
|
||
|
||
def _ensure_topic_exists_or_400(db: Session, topic_code: str) -> None:
|
||
exists = db.query(Topic.id).filter(Topic.code == topic_code).first()
|
||
if exists is None:
|
||
raise HTTPException(status_code=400, detail="Тема не найдена")
|
||
|
||
|
||
def _ensure_form_field_exists_or_400(db: Session, field_key: str) -> None:
|
||
exists = db.query(FormField.id).filter(FormField.key == field_key).first()
|
||
if exists is None:
|
||
raise HTTPException(status_code=400, detail="Поле формы не найдено")
|
||
|
||
|
||
def _ensure_status_exists_or_400(db: Session, status_code: str) -> None:
|
||
exists = db.query(Status.id).filter(Status.code == status_code).first()
|
||
if exists is None:
|
||
raise HTTPException(status_code=400, detail="Статус не найден")
|
||
|
||
|
||
def _as_positive_int_or_400(value: Any, field_name: str) -> int:
|
||
try:
|
||
number = int(value)
|
||
except (TypeError, ValueError):
|
||
raise HTTPException(status_code=400, detail=f'Поле "{field_name}" должно быть целым числом')
|
||
if number <= 0:
|
||
raise HTTPException(status_code=400, detail=f'Поле "{field_name}" должно быть больше 0')
|
||
return number
|
||
|
||
|
||
def _normalize_string_list_or_400(value: Any, field_name: str) -> list[str] | None:
|
||
if value is None:
|
||
return None
|
||
|
||
source = value
|
||
if isinstance(source, str):
|
||
text = source.strip()
|
||
if not text:
|
||
return None
|
||
if text.startswith("["):
|
||
try:
|
||
source = json.loads(text)
|
||
except json.JSONDecodeError:
|
||
raise HTTPException(status_code=400, detail=f'Поле "{field_name}" должно быть JSON-массивом строк')
|
||
else:
|
||
source = [chunk.strip() for chunk in text.replace("\n", ",").split(",")]
|
||
|
||
if not isinstance(source, (list, tuple, set)):
|
||
raise HTTPException(status_code=400, detail=f'Поле "{field_name}" должно быть массивом строк')
|
||
|
||
out: list[str] = []
|
||
seen: set[str] = set()
|
||
for item in source:
|
||
text = str(item or "").strip()
|
||
if not text:
|
||
continue
|
||
lowered = text.lower()
|
||
if lowered in seen:
|
||
continue
|
||
seen.add(lowered)
|
||
out.append(text)
|
||
return out
|
||
|
||
|
||
def _apply_topic_required_fields_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
|
||
data = dict(payload)
|
||
if "topic_code" in data:
|
||
topic_code = str(data.get("topic_code") or "").strip()
|
||
if not topic_code:
|
||
raise HTTPException(status_code=400, detail='Поле "topic_code" не может быть пустым')
|
||
_ensure_topic_exists_or_400(db, topic_code)
|
||
data["topic_code"] = topic_code
|
||
if "field_key" in data:
|
||
field_key = str(data.get("field_key") or "").strip()
|
||
if not field_key:
|
||
raise HTTPException(status_code=400, detail='Поле "field_key" не может быть пустым')
|
||
_ensure_form_field_exists_or_400(db, field_key)
|
||
data["field_key"] = field_key
|
||
return data
|
||
|
||
|
||
def _apply_topic_data_templates_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
|
||
data = dict(payload)
|
||
if "topic_code" in data:
|
||
topic_code = str(data.get("topic_code") or "").strip()
|
||
if not topic_code:
|
||
raise HTTPException(status_code=400, detail='Поле "topic_code" не может быть пустым')
|
||
_ensure_topic_exists_or_400(db, topic_code)
|
||
data["topic_code"] = topic_code
|
||
if "key" in data:
|
||
key = str(data.get("key") or "").strip()
|
||
if not key:
|
||
raise HTTPException(status_code=400, detail='Поле "key" не может быть пустым')
|
||
data["key"] = key
|
||
if "value_type" in data:
|
||
value_type = str(data.get("value_type") or "").strip().lower()
|
||
if value_type not in ALLOWED_REQUEST_DATA_VALUE_TYPES:
|
||
raise HTTPException(status_code=400, detail='Поле "value_type" должно быть одним из: string, text, date, number, file')
|
||
data["value_type"] = value_type
|
||
if "document_name" in data:
|
||
data["document_name"] = _normalize_optional_string(data.get("document_name"))
|
||
return data
|
||
|
||
|
||
def _apply_request_data_templates_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
|
||
data = dict(payload)
|
||
if "topic_code" in data:
|
||
topic_code = str(data.get("topic_code") or "").strip()
|
||
if not topic_code:
|
||
raise HTTPException(status_code=400, detail='Поле "topic_code" не может быть пустым')
|
||
_ensure_topic_exists_or_400(db, topic_code)
|
||
data["topic_code"] = topic_code
|
||
if "name" in data:
|
||
name = str(data.get("name") or "").strip()
|
||
if not name:
|
||
raise HTTPException(status_code=400, detail='Поле "name" не может быть пустым')
|
||
data["name"] = name
|
||
if "description" in data:
|
||
data["description"] = _normalize_optional_string(data.get("description"))
|
||
if "created_by_admin_id" in data and data.get("created_by_admin_id") is not None:
|
||
admin_id = _parse_uuid_or_400(data.get("created_by_admin_id"), "created_by_admin_id")
|
||
admin_user = db.get(AdminUser, admin_id)
|
||
if admin_user is None:
|
||
raise HTTPException(status_code=400, detail="Пользователь не найден")
|
||
data["created_by_admin_id"] = admin_id
|
||
return data
|
||
|
||
|
||
def _apply_request_data_template_items_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
|
||
data = dict(payload)
|
||
template = None
|
||
if "request_data_template_id" in data:
|
||
template_id = _parse_uuid_or_400(data.get("request_data_template_id"), "request_data_template_id")
|
||
template = db.get(RequestDataTemplate, template_id)
|
||
if template is None:
|
||
raise HTTPException(status_code=400, detail="Шаблон не найден")
|
||
data["request_data_template_id"] = template_id
|
||
if "topic_data_template_id" in data and data.get("topic_data_template_id") is not None:
|
||
catalog_id = _parse_uuid_or_400(data.get("topic_data_template_id"), "topic_data_template_id")
|
||
catalog = db.get(TopicDataTemplate, catalog_id)
|
||
if catalog is None:
|
||
raise HTTPException(status_code=400, detail="Поле доп. данных не найдено")
|
||
data["topic_data_template_id"] = catalog_id
|
||
if "key" not in data or not str(data.get("key") or "").strip():
|
||
data["key"] = str(catalog.key or "").strip()
|
||
if "label" not in data or not str(data.get("label") or "").strip():
|
||
data["label"] = str(catalog.label or catalog.key or "").strip()
|
||
if "value_type" not in data or not str(data.get("value_type") or "").strip():
|
||
data["value_type"] = str(catalog.value_type or "string")
|
||
if template is not None and str(template.topic_code or "").strip() and str(catalog.topic_code or "").strip():
|
||
if str(template.topic_code) != str(catalog.topic_code):
|
||
raise HTTPException(status_code=400, detail="Поле не соответствует теме шаблона")
|
||
if "key" in data:
|
||
key = str(data.get("key") or "").strip()
|
||
if not key:
|
||
raise HTTPException(status_code=400, detail='Поле "key" не может быть пустым')
|
||
data["key"] = key[:80]
|
||
if "label" in data:
|
||
label = str(data.get("label") or "").strip()
|
||
if not label:
|
||
raise HTTPException(status_code=400, detail='Поле "label" не может быть пустым')
|
||
data["label"] = label
|
||
if "value_type" in data:
|
||
value_type = str(data.get("value_type") or "").strip().lower()
|
||
if value_type not in ALLOWED_REQUEST_DATA_VALUE_TYPES:
|
||
raise HTTPException(status_code=400, detail='Поле "value_type" должно быть одним из: string, text, date, number, file')
|
||
data["value_type"] = value_type
|
||
if "sort_order" in data:
|
||
raw = data.get("sort_order")
|
||
if raw is None or str(raw).strip() == "":
|
||
data["sort_order"] = 0
|
||
else:
|
||
try:
|
||
data["sort_order"] = int(raw)
|
||
except (TypeError, ValueError):
|
||
raise HTTPException(status_code=400, detail='Поле "sort_order" должно быть целым числом')
|
||
return data
|
||
|
||
|
||
def _apply_request_data_requirements_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
|
||
data = dict(payload)
|
||
if "request_id" in data:
|
||
request_id = _parse_uuid_or_400(data.get("request_id"), "request_id")
|
||
request = db.get(Request, request_id)
|
||
if request is None:
|
||
raise HTTPException(status_code=400, detail="Заявка не найдена")
|
||
data["request_id"] = request_id
|
||
if "topic_template_id" in data and data.get("topic_template_id") is not None:
|
||
template_id = _parse_uuid_or_400(data.get("topic_template_id"), "topic_template_id")
|
||
template = db.get(TopicDataTemplate, template_id)
|
||
if template is None:
|
||
raise HTTPException(status_code=400, detail="Шаблон темы не найден")
|
||
data["topic_template_id"] = template_id
|
||
if "request_message_id" in data and data.get("request_message_id") is not None:
|
||
data["request_message_id"] = _parse_uuid_or_400(data.get("request_message_id"), "request_message_id")
|
||
if "key" in data:
|
||
key = str(data.get("key") or "").strip()
|
||
if not key:
|
||
raise HTTPException(status_code=400, detail='Поле "key" не может быть пустым')
|
||
data["key"] = key
|
||
if "field_type" in data:
|
||
field_type = str(data.get("field_type") or "").strip().lower()
|
||
if field_type not in ALLOWED_REQUEST_DATA_VALUE_TYPES:
|
||
raise HTTPException(status_code=400, detail='Поле "field_type" должно быть одним из: string, text, date, number, file')
|
||
data["field_type"] = field_type
|
||
if "document_name" in data:
|
||
data["document_name"] = _normalize_optional_string(data.get("document_name"))
|
||
if "value_text" in data:
|
||
data["value_text"] = _normalize_optional_string(data.get("value_text"))
|
||
if "sort_order" in data:
|
||
raw_sort = data.get("sort_order")
|
||
if raw_sort is None or str(raw_sort).strip() == "":
|
||
data["sort_order"] = 0
|
||
else:
|
||
try:
|
||
data["sort_order"] = int(raw_sort)
|
||
except (TypeError, ValueError):
|
||
raise HTTPException(status_code=400, detail='Поле "sort_order" должно быть целым числом')
|
||
return data
|
||
|
||
|
||
def _apply_topic_status_transitions_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
|
||
data = dict(payload)
|
||
topic_code = None
|
||
from_status = None
|
||
to_status = None
|
||
|
||
if "topic_code" in data:
|
||
topic_code = str(data.get("topic_code") or "").strip()
|
||
if not topic_code:
|
||
raise HTTPException(status_code=400, detail='Поле "topic_code" не может быть пустым')
|
||
_ensure_topic_exists_or_400(db, topic_code)
|
||
data["topic_code"] = topic_code
|
||
if "from_status" in data:
|
||
from_status = str(data.get("from_status") or "").strip()
|
||
if not from_status:
|
||
raise HTTPException(status_code=400, detail='Поле "from_status" не может быть пустым')
|
||
_ensure_status_exists_or_400(db, from_status)
|
||
data["from_status"] = from_status
|
||
if "to_status" in data:
|
||
to_status = str(data.get("to_status") or "").strip()
|
||
if not to_status:
|
||
raise HTTPException(status_code=400, detail='Поле "to_status" не может быть пустым')
|
||
_ensure_status_exists_or_400(db, to_status)
|
||
data["to_status"] = to_status
|
||
|
||
if from_status and to_status and from_status == to_status:
|
||
raise HTTPException(status_code=400, detail='Поля "from_status" и "to_status" не должны совпадать')
|
||
|
||
if "sla_hours" in data:
|
||
raw = data.get("sla_hours")
|
||
if raw is None or str(raw).strip() == "":
|
||
data["sla_hours"] = None
|
||
else:
|
||
data["sla_hours"] = _as_positive_int_or_400(raw, "sla_hours")
|
||
if "required_data_keys" in data:
|
||
data["required_data_keys"] = _normalize_string_list_or_400(data.get("required_data_keys"), "required_data_keys")
|
||
if "required_mime_types" in data:
|
||
data["required_mime_types"] = _normalize_string_list_or_400(data.get("required_mime_types"), "required_mime_types")
|
||
|
||
return data
|
||
|
||
|
||
def _apply_status_fields(db: Session, payload: dict[str, Any]) -> dict[str, Any]:
|
||
data = dict(payload)
|
||
if "kind" in data:
|
||
data["kind"] = normalize_status_kind_or_400(data.get("kind"))
|
||
if "status_group_id" in data:
|
||
raw_group = data.get("status_group_id")
|
||
if raw_group is None or str(raw_group).strip() == "":
|
||
data["status_group_id"] = None
|
||
else:
|
||
group_id = _parse_uuid_or_400(raw_group, "status_group_id")
|
||
group = db.get(StatusGroup, group_id)
|
||
if group is None:
|
||
raise HTTPException(status_code=400, detail="Группа статусов не найдена")
|
||
data["status_group_id"] = group_id
|
||
if "invoice_template" in data:
|
||
text = str(data.get("invoice_template") or "").strip()
|
||
data["invoice_template"] = text or None
|
||
return data
|
||
|
||
|
||
_RU_TO_LATIN = {
|
||
"а": "a",
|
||
"б": "b",
|
||
"в": "v",
|
||
"г": "g",
|
||
"д": "d",
|
||
"е": "e",
|
||
"ё": "e",
|
||
"ж": "zh",
|
||
"з": "z",
|
||
"и": "i",
|
||
"й": "y",
|
||
"к": "k",
|
||
"л": "l",
|
||
"м": "m",
|
||
"н": "n",
|
||
"о": "o",
|
||
"п": "p",
|
||
"р": "r",
|
||
"с": "s",
|
||
"т": "t",
|
||
"у": "u",
|
||
"ф": "f",
|
||
"х": "h",
|
||
"ц": "ts",
|
||
"ч": "ch",
|
||
"ш": "sh",
|
||
"щ": "sch",
|
||
"ъ": "",
|
||
"ы": "y",
|
||
"ь": "",
|
||
"э": "e",
|
||
"ю": "yu",
|
||
"я": "ya",
|
||
}
|
||
|
||
|
||
def _slugify(value: str, fallback: str) -> str:
|
||
raw = str(value or "").strip().lower()
|
||
if not raw:
|
||
return fallback
|
||
latin = "".join(_RU_TO_LATIN.get(ch, ch) for ch in raw)
|
||
out: list[str] = []
|
||
prev_dash = False
|
||
for ch in latin:
|
||
if ("a" <= ch <= "z") or ("0" <= ch <= "9"):
|
||
out.append(ch)
|
||
prev_dash = False
|
||
continue
|
||
if not prev_dash:
|
||
out.append("-")
|
||
prev_dash = True
|
||
slug = "".join(out).strip("-")
|
||
return slug or fallback
|
||
|
||
|
||
def _make_unique_value(db: Session, model: type, field_name: str, base_value: str) -> str:
|
||
columns = _columns_map(model)
|
||
column = columns[field_name]
|
||
max_len = getattr(column.type, "length", None)
|
||
base = base_value.strip("-") or field_name
|
||
if max_len:
|
||
base = base[:max_len]
|
||
|
||
field = getattr(model, field_name)
|
||
if not db.query(model).filter(field == base).first():
|
||
return base
|
||
|
||
idx = 2
|
||
while True:
|
||
suffix = f"-{idx}"
|
||
candidate = base
|
||
if max_len and len(candidate) + len(suffix) > max_len:
|
||
candidate = candidate[: max_len - len(suffix)]
|
||
candidate = (candidate + suffix).strip("-")
|
||
if not db.query(model).filter(field == candidate).first():
|
||
return candidate
|
||
idx += 1
|
||
|
||
|
||
def _next_sort_order_value(db: Session, model: type) -> int:
|
||
if "sort_order" not in _columns_map(model):
|
||
return 1
|
||
column = getattr(model, "sort_order", None)
|
||
if column is None:
|
||
return 1
|
||
current_max = db.query(func.max(column)).scalar()
|
||
try:
|
||
return int(current_max or 0) + 1
|
||
except (TypeError, ValueError):
|
||
return 1
|
||
|
||
|
||
def _apply_auto_fields_for_create(db: Session, model: type, table_name: str, payload: dict[str, Any]) -> dict[str, Any]:
|
||
data = dict(payload)
|
||
if table_name == "topics" and not str(data.get("code") or "").strip():
|
||
base = _slugify(str(data.get("name") or ""), "topic")
|
||
data["code"] = _make_unique_value(db, model, "code", base)
|
||
if table_name == "topics":
|
||
data["sort_order"] = _next_sort_order_value(db, model)
|
||
if table_name == "statuses" and not str(data.get("code") or "").strip():
|
||
base = _slugify(str(data.get("name") or ""), "status")
|
||
data["code"] = _make_unique_value(db, model, "code", base)
|
||
if table_name == "form_fields" and not str(data.get("key") or "").strip():
|
||
base = _slugify(str(data.get("label") or ""), "field")
|
||
data["key"] = _make_unique_value(db, model, "key", base)
|
||
if table_name == "admin_users":
|
||
data = _apply_admin_user_fields_for_create(data)
|
||
return data
|