Law/app/services/security_audit.py
2026-03-02 16:22:07 +03:00

173 lines
5.3 KiB
Python

from __future__ import annotations
import logging
import uuid
from datetime import timedelta
from typing import Any
from fastapi import Request as FastapiRequest
from sqlalchemy import func, inspect
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from app.models.security_audit_log import SecurityAuditLog
from app.models.common import utcnow
logger = logging.getLogger(__name__)
SUSPICIOUS_DENY_WINDOW_MINUTES = 10
SUSPICIOUS_DENY_THRESHOLD = 5
def _uuid_or_none(raw: str | uuid.UUID | None) -> uuid.UUID | None:
if raw is None:
return None
if isinstance(raw, uuid.UUID):
return raw
try:
return uuid.UUID(str(raw))
except (TypeError, ValueError):
return None
def _safe_details(details: dict[str, Any] | None) -> dict[str, Any]:
if not isinstance(details, dict):
return {}
safe: dict[str, Any] = {}
for key, value in details.items():
if value is None or isinstance(value, (str, int, float, bool)):
safe[str(key)] = value
else:
safe[str(key)] = str(value)
return safe
def extract_client_ip(http_request: FastapiRequest | None) -> str | None:
if http_request is None:
return None
xff = str(http_request.headers.get("x-forwarded-for") or "").strip()
if xff:
first = xff.split(",")[0].strip()
if first:
return first
if http_request.client and http_request.client.host:
return str(http_request.client.host)
return None
def _emit_suspicious_denied_download_alert(
db: Session,
*,
actor_role: str,
actor_subject: str,
actor_ip: str | None,
) -> None:
if not actor_subject and not actor_ip:
return
since = utcnow() - timedelta(minutes=SUSPICIOUS_DENY_WINDOW_MINUTES)
query = db.query(func.count(SecurityAuditLog.id)).filter(
SecurityAuditLog.created_at >= since,
SecurityAuditLog.action == "DOWNLOAD_OBJECT",
SecurityAuditLog.allowed.is_(False),
)
if actor_subject:
query = query.filter(SecurityAuditLog.actor_subject == actor_subject)
elif actor_ip:
query = query.filter(SecurityAuditLog.actor_ip == actor_ip)
denied_count = int(query.scalar() or 0)
if denied_count >= SUSPICIOUS_DENY_THRESHOLD:
logger.warning(
"SECURITY_ALERT repeated denied download attempts role=%s subject=%s ip=%s count=%s window_min=%s",
actor_role,
actor_subject or "-",
actor_ip or "-",
denied_count,
SUSPICIOUS_DENY_WINDOW_MINUTES,
)
def record_file_security_event(
db: Session,
*,
actor_role: str,
actor_subject: str,
actor_ip: str | None,
action: str,
scope: str,
allowed: bool,
reason: str | None = None,
object_key: str | None = None,
request_id: str | uuid.UUID | None = None,
attachment_id: str | uuid.UUID | None = None,
details: dict[str, Any] | None = None,
responsible: str | None = None,
persist_now: bool = False,
) -> None:
# Security telemetry must not block business flow if DB log write fails.
try:
bind = db.get_bind()
if bind is None or not inspect(bind).has_table("security_audit_log"):
return
row = SecurityAuditLog(
actor_role=str(actor_role or "UNKNOWN").upper(),
actor_subject=str(actor_subject or "").strip(),
actor_ip=str(actor_ip or "").strip() or None,
action=str(action or "").strip().upper() or "UNKNOWN",
scope=str(scope or "").strip().upper() or "UNKNOWN",
object_key=str(object_key or "").strip() or None,
request_id=_uuid_or_none(request_id),
attachment_id=_uuid_or_none(attachment_id),
allowed=bool(allowed),
reason=(str(reason)[:400] if reason is not None else None),
details=_safe_details(details),
responsible=str(responsible or "Администратор системы").strip() or "Администратор системы",
)
db.add(row)
db.flush()
if not bool(allowed) and str(action or "").upper() == "DOWNLOAD_OBJECT":
_emit_suspicious_denied_download_alert(
db,
actor_role=row.actor_role,
actor_subject=row.actor_subject,
actor_ip=row.actor_ip,
)
if persist_now:
db.commit()
except SQLAlchemyError:
if persist_now:
try:
db.rollback()
except Exception:
logger.debug("security_audit_rollback_failed", exc_info=True)
def record_pii_access_event(
db: Session,
*,
actor_role: str,
actor_subject: str,
actor_ip: str | None,
action: str,
scope: str,
request_id: str | uuid.UUID | None = None,
details: dict[str, Any] | None = None,
responsible: str | None = None,
persist_now: bool = False,
) -> None:
record_file_security_event(
db,
actor_role=actor_role,
actor_subject=actor_subject,
actor_ip=actor_ip,
action=action,
scope=scope,
allowed=True,
reason=None,
object_key=None,
request_id=request_id,
details=details,
responsible=responsible,
persist_now=persist_now,
)