Law/app/services/chat_crypto.py
2026-02-28 11:45:08 +03:00

83 lines
2.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import base64
import hashlib
import hmac
import secrets
from app.core.config import settings
_VERSION = b"v1"
_PREFIX = "chatenc:v1:"
def _encryption_secret() -> str:
chat_secret = str(settings.CHAT_ENCRYPTION_SECRET or "").strip()
if chat_secret:
return chat_secret
fallback = str(settings.DATA_ENCRYPTION_SECRET or "").strip()
if fallback:
return fallback
fallback = str(settings.ADMIN_JWT_SECRET or "").strip()
if fallback:
return fallback
fallback = str(settings.PUBLIC_JWT_SECRET or "").strip()
if fallback:
return fallback
raise ValueError("Не задан секрет шифрования чата")
def _key() -> bytes:
return hashlib.sha256(_encryption_secret().encode("utf-8")).digest()
def _xor_bytes(a: bytes, b: bytes) -> bytes:
return bytes(x ^ y for x, y in zip(a, b))
def is_encrypted_message(value: str | None) -> bool:
token = str(value or "").strip()
return token.startswith(_PREFIX)
def encrypt_message_body(value: str | None) -> str | None:
if value is None:
return None
text = str(value)
if not text:
return text
if is_encrypted_message(text):
return text
raw = text.encode("utf-8")
nonce = secrets.token_bytes(16)
stream = hashlib.pbkdf2_hmac("sha256", _key(), nonce, 120_000, dklen=len(raw))
cipher = _xor_bytes(raw, stream)
tag = hmac.new(_key(), _VERSION + nonce + cipher, hashlib.sha256).digest()
token = _VERSION + nonce + tag + cipher
return _PREFIX + base64.urlsafe_b64encode(token).decode("ascii")
def decrypt_message_body(value: str | None) -> str | None:
if value is None:
return None
text = str(value)
if not text:
return text
if not is_encrypted_message(text):
return text
encoded = text[len(_PREFIX) :]
blob = base64.urlsafe_b64decode(encoded.encode("ascii"))
if len(blob) < 2 + 16 + 32:
raise ValueError("Некорректный зашифрованный формат сообщения")
version = blob[:2]
nonce = blob[2:18]
tag = blob[18:50]
cipher = blob[50:]
if version != _VERSION:
raise ValueError("Неподдерживаемая версия шифрования чата")
expected = hmac.new(_key(), version + nonce + cipher, hashlib.sha256).digest()
if not hmac.compare_digest(tag, expected):
raise ValueError("Поврежденные данные сообщения")
stream = hashlib.pbkdf2_hmac("sha256", _key(), nonce, 120_000, dklen=len(cipher))
raw = _xor_bytes(cipher, stream)
return raw.decode("utf-8")