Law/tests/test_crypto_kid_rotation.py
2026-03-02 16:22:07 +03:00

115 lines
4.8 KiB
Python

import base64
import hashlib
import hmac
import os
import unittest
os.environ.setdefault("DATABASE_URL", "sqlite+pysqlite:///:memory:")
os.environ.setdefault("REDIS_URL", "redis://localhost:6379/0")
os.environ.setdefault("S3_ENDPOINT", "http://localhost:9000")
os.environ.setdefault("S3_ACCESS_KEY", "test")
os.environ.setdefault("S3_SECRET_KEY", "test")
os.environ.setdefault("S3_BUCKET", "test")
from app.core.config import settings
from app.services.chat_crypto import decrypt_message_body, encrypt_message_body, extract_message_kid
from app.services.invoice_crypto import (
active_requisites_kid,
decrypt_requisites,
encrypt_requisites,
extract_requisites_kid,
)
def _xor_bytes(a: bytes, b: bytes) -> bytes:
return bytes(x ^ y for x, y in zip(a, b))
def _legacy_invoice_token(payload: dict, secret: str) -> str:
raw = (str(payload).replace("'", '"')).encode("utf-8")
# stable json-like payload for this test suite
raw = b'{"secret":"LEGACY"}' if payload.get("secret") == "LEGACY" else raw
key = hashlib.sha256(secret.encode("utf-8")).digest()
nonce = bytes.fromhex("00112233445566778899aabbccddeeff")
stream = hashlib.pbkdf2_hmac("sha256", key, nonce, 120_000, dklen=len(raw))
cipher = _xor_bytes(raw, stream)
tag = hmac.new(key, b"v1" + nonce + cipher, hashlib.sha256).digest()
token = b"v1" + nonce + tag + cipher
return base64.urlsafe_b64encode(token).decode("ascii")
def _legacy_chat_token(plaintext: str, secret: str) -> str:
raw = plaintext.encode("utf-8")
key = hashlib.sha256(secret.encode("utf-8")).digest()
nonce = bytes.fromhex("ffeeddccbbaa99887766554433221100")
stream = hashlib.pbkdf2_hmac("sha256", key, nonce, 120_000, dklen=len(raw))
cipher = _xor_bytes(raw, stream)
tag = hmac.new(key, b"v1" + nonce + cipher, hashlib.sha256).digest()
token = b"v1" + nonce + tag + cipher
return "chatenc:v1:" + base64.urlsafe_b64encode(token).decode("ascii")
class CryptoKidRotationTests(unittest.TestCase):
def setUp(self):
self._backup = {
"DATA_ENCRYPTION_SECRET": settings.DATA_ENCRYPTION_SECRET,
"DATA_ENCRYPTION_ACTIVE_KID": settings.DATA_ENCRYPTION_ACTIVE_KID,
"DATA_ENCRYPTION_KEYS": settings.DATA_ENCRYPTION_KEYS,
"CHAT_ENCRYPTION_SECRET": settings.CHAT_ENCRYPTION_SECRET,
"CHAT_ENCRYPTION_ACTIVE_KID": settings.CHAT_ENCRYPTION_ACTIVE_KID,
"CHAT_ENCRYPTION_KEYS": settings.CHAT_ENCRYPTION_KEYS,
}
def tearDown(self):
for key, value in self._backup.items():
setattr(settings, key, value)
def test_invoice_encrypt_uses_active_kid(self):
settings.DATA_ENCRYPTION_SECRET = "legacy-secret-1234567890"
settings.DATA_ENCRYPTION_ACTIVE_KID = "k2"
settings.DATA_ENCRYPTION_KEYS = "k1=old-secret-1111111111111111,k2=new-secret-2222222222222222"
token = encrypt_requisites({"inn": "7700000000"})
self.assertTrue(token.startswith("invenc:v2:"))
self.assertEqual(extract_requisites_kid(token), "k2")
payload = decrypt_requisites(token)
self.assertEqual(payload.get("inn"), "7700000000")
self.assertEqual(active_requisites_kid(), "k2")
def test_invoice_decrypts_legacy_after_rotation(self):
legacy_secret = "legacy-data-secret-aaaaaaaaaaaaaaaa"
legacy = _legacy_invoice_token({"secret": "LEGACY"}, legacy_secret)
settings.DATA_ENCRYPTION_SECRET = ""
settings.DATA_ENCRYPTION_ACTIVE_KID = "k2"
settings.DATA_ENCRYPTION_KEYS = f"k1={legacy_secret},k2=new-data-secret-bbbbbbbbbbbbbbbb"
payload = decrypt_requisites(legacy)
self.assertEqual(payload.get("secret"), "LEGACY")
rotated = encrypt_requisites(payload)
self.assertEqual(extract_requisites_kid(rotated), "k2")
self.assertEqual(decrypt_requisites(rotated).get("secret"), "LEGACY")
def test_chat_decrypts_legacy_and_writes_new_kid(self):
legacy_secret = "legacy-chat-secret-aaaaaaaaaaaaaaaa"
legacy_token = _legacy_chat_token("legacy message", legacy_secret)
settings.DATA_ENCRYPTION_SECRET = ""
settings.DATA_ENCRYPTION_ACTIVE_KID = "k2"
settings.DATA_ENCRYPTION_KEYS = "k2=new-data-secret-bbbbbbbbbbbbbbbb"
settings.CHAT_ENCRYPTION_SECRET = ""
settings.CHAT_ENCRYPTION_ACTIVE_KID = "k2"
settings.CHAT_ENCRYPTION_KEYS = f"k1={legacy_secret},k2=new-chat-secret-cccccccccccccccc"
plain = decrypt_message_body(legacy_token)
self.assertEqual(plain, "legacy message")
token = encrypt_message_body(plain)
self.assertTrue(token.startswith("chatenc:v2:"))
self.assertEqual(extract_message_kid(token), "k2")
self.assertEqual(decrypt_message_body(token), "legacy message")
if __name__ == "__main__":
unittest.main()