Law/tests/test_attachment_scan.py
2026-03-01 17:31:09 +03:00

154 lines
5.8 KiB
Python

import os
import unittest
from uuid import UUID
from unittest.mock import patch
from botocore.exceptions import ClientError
from sqlalchemy import create_engine, delete
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
os.environ.setdefault("DATABASE_URL", "sqlite+pysqlite:///:memory:")
os.environ.setdefault("REDIS_URL", "redis://localhost:6379/0")
os.environ.setdefault("S3_ENDPOINT", "http://localhost:9000")
os.environ.setdefault("S3_ACCESS_KEY", "test")
os.environ.setdefault("S3_SECRET_KEY", "test")
os.environ.setdefault("S3_BUCKET", "test")
from app.models.attachment import Attachment
from app.models.request import Request
from app.models.security_audit_log import SecurityAuditLog
from app.services.attachment_scan import SCAN_STATUS_CLEAN, SCAN_STATUS_INFECTED, scan_attachment_file_impl
import app.services.attachment_scan as attachment_scan_module
from app.db import session as db_session
class _FakeBody:
def __init__(self, payload: bytes):
self.payload = payload
def iter_chunks(self, chunk_size=65536):
for i in range(0, len(self.payload), chunk_size):
yield self.payload[i : i + chunk_size]
class _FakeS3Storage:
def __init__(self):
self.objects = {}
def get_object(self, key: str) -> dict:
obj = self.objects.get(key)
if obj is None:
raise ClientError({"Error": {"Code": "404", "Message": "Not Found"}}, "GetObject")
return {"Body": _FakeBody(obj["content"]), "ContentType": obj["mime"], "ContentLength": obj["size"]}
class AttachmentScanTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
cls.SessionLocal = sessionmaker(bind=cls.engine, autocommit=False, autoflush=False)
Request.__table__.create(bind=cls.engine)
Attachment.__table__.create(bind=cls.engine)
SecurityAuditLog.__table__.create(bind=cls.engine)
cls._orig_session_local = db_session.SessionLocal
cls._orig_scan_session_local = attachment_scan_module.SessionLocal
db_session.SessionLocal = cls.SessionLocal
attachment_scan_module.SessionLocal = cls.SessionLocal
@classmethod
def tearDownClass(cls):
db_session.SessionLocal = cls._orig_session_local
attachment_scan_module.SessionLocal = cls._orig_scan_session_local
SecurityAuditLog.__table__.drop(bind=cls.engine)
Attachment.__table__.drop(bind=cls.engine)
Request.__table__.drop(bind=cls.engine)
cls.engine.dispose()
def setUp(self):
with self.SessionLocal() as db:
db.execute(delete(SecurityAuditLog))
db.execute(delete(Attachment))
db.execute(delete(Request))
db.commit()
def test_scan_marks_clean_for_valid_pdf_when_clamav_disabled(self):
fake_s3 = _FakeS3Storage()
with self.SessionLocal() as db:
req = Request(
track_number="TRK-SCAN-001",
client_name="Клиент",
client_phone="+79990000001",
topic_code="consulting",
status_code="NEW",
extra_fields={},
)
db.add(req)
db.flush()
key = f"requests/{req.id}/contract.pdf"
att = Attachment(
request_id=req.id,
file_name="contract.pdf",
mime_type="application/pdf",
size_bytes=64,
s3_key=key,
)
db.add(att)
db.commit()
attachment_id = str(att.id)
fake_s3.objects[key] = {"size": 64, "mime": "application/pdf", "content": b"%PDF-1.4\nhello"}
with (
patch("app.services.attachment_scan.get_s3_storage", return_value=fake_s3),
patch("app.services.attachment_scan.settings.CLAMAV_ENABLED", False),
):
result = scan_attachment_file_impl(attachment_id)
self.assertEqual(result.get("status"), SCAN_STATUS_CLEAN)
with self.SessionLocal() as db:
row = db.get(Attachment, UUID(attachment_id))
self.assertEqual(row.scan_status, SCAN_STATUS_CLEAN)
self.assertTrue(bool(row.content_sha256))
self.assertEqual(row.detected_mime, "application/pdf")
def test_scan_marks_infected_for_content_policy_mismatch(self):
fake_s3 = _FakeS3Storage()
with self.SessionLocal() as db:
req = Request(
track_number="TRK-SCAN-002",
client_name="Клиент",
client_phone="+79990000002",
topic_code="consulting",
status_code="NEW",
extra_fields={},
)
db.add(req)
db.flush()
key = f"requests/{req.id}/wrong.pdf"
att = Attachment(
request_id=req.id,
file_name="wrong.pdf",
mime_type="application/pdf",
size_bytes=64,
s3_key=key,
)
db.add(att)
db.commit()
attachment_id = str(att.id)
fake_s3.objects[key] = {"size": 64, "mime": "application/pdf", "content": b"\x89PNG\r\n\x1a\nbad"}
with (
patch("app.services.attachment_scan.get_s3_storage", return_value=fake_s3),
patch("app.services.attachment_scan.settings.CLAMAV_ENABLED", False),
):
result = scan_attachment_file_impl(attachment_id)
self.assertEqual(result.get("status"), SCAN_STATUS_INFECTED)
with self.SessionLocal() as db:
row = db.get(Attachment, UUID(attachment_id))
self.assertEqual(row.scan_status, SCAN_STATUS_INFECTED)
self.assertEqual(row.scan_signature, "CONTENT_POLICY")