mirror of
https://github.com/TronoSfera/Law.git
synced 2026-05-18 18:13:46 +03:00
83 lines
2.6 KiB
Python
83 lines
2.6 KiB
Python
from __future__ import annotations
|
||
|
||
import base64
|
||
import hashlib
|
||
import hmac
|
||
import secrets
|
||
|
||
from app.core.config import settings
|
||
|
||
_VERSION = b"v1"
|
||
_PREFIX = "chatenc:v1:"
|
||
|
||
|
||
def _encryption_secret() -> str:
|
||
chat_secret = str(settings.CHAT_ENCRYPTION_SECRET or "").strip()
|
||
if chat_secret:
|
||
return chat_secret
|
||
fallback = str(settings.DATA_ENCRYPTION_SECRET or "").strip()
|
||
if fallback:
|
||
return fallback
|
||
fallback = str(settings.ADMIN_JWT_SECRET or "").strip()
|
||
if fallback:
|
||
return fallback
|
||
fallback = str(settings.PUBLIC_JWT_SECRET or "").strip()
|
||
if fallback:
|
||
return fallback
|
||
raise ValueError("Не задан секрет шифрования чата")
|
||
|
||
|
||
def _key() -> bytes:
|
||
return hashlib.sha256(_encryption_secret().encode("utf-8")).digest()
|
||
|
||
|
||
def _xor_bytes(a: bytes, b: bytes) -> bytes:
|
||
return bytes(x ^ y for x, y in zip(a, b))
|
||
|
||
|
||
def is_encrypted_message(value: str | None) -> bool:
|
||
token = str(value or "").strip()
|
||
return token.startswith(_PREFIX)
|
||
|
||
|
||
def encrypt_message_body(value: str | None) -> str | None:
|
||
if value is None:
|
||
return None
|
||
text = str(value)
|
||
if not text:
|
||
return text
|
||
if is_encrypted_message(text):
|
||
return text
|
||
raw = text.encode("utf-8")
|
||
nonce = secrets.token_bytes(16)
|
||
stream = hashlib.pbkdf2_hmac("sha256", _key(), nonce, 120_000, dklen=len(raw))
|
||
cipher = _xor_bytes(raw, stream)
|
||
tag = hmac.new(_key(), _VERSION + nonce + cipher, hashlib.sha256).digest()
|
||
token = _VERSION + nonce + tag + cipher
|
||
return _PREFIX + base64.urlsafe_b64encode(token).decode("ascii")
|
||
|
||
|
||
def decrypt_message_body(value: str | None) -> str | None:
|
||
if value is None:
|
||
return None
|
||
text = str(value)
|
||
if not text:
|
||
return text
|
||
if not is_encrypted_message(text):
|
||
return text
|
||
encoded = text[len(_PREFIX) :]
|
||
blob = base64.urlsafe_b64decode(encoded.encode("ascii"))
|
||
if len(blob) < 2 + 16 + 32:
|
||
raise ValueError("Некорректный зашифрованный формат сообщения")
|
||
version = blob[:2]
|
||
nonce = blob[2:18]
|
||
tag = blob[18:50]
|
||
cipher = blob[50:]
|
||
if version != _VERSION:
|
||
raise ValueError("Неподдерживаемая версия шифрования чата")
|
||
expected = hmac.new(_key(), version + nonce + cipher, hashlib.sha256).digest()
|
||
if not hmac.compare_digest(tag, expected):
|
||
raise ValueError("Поврежденные данные сообщения")
|
||
stream = hashlib.pbkdf2_hmac("sha256", _key(), nonce, 120_000, dklen=len(cipher))
|
||
raw = _xor_bytes(cipher, stream)
|
||
return raw.decode("utf-8")
|