mirror of
https://github.com/TronoSfera/Law.git
synced 2026-05-18 18:13:46 +03:00
23 lines
711 B
Python
23 lines
711 B
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
from app.db.session import SessionLocal
|
|
from app.models.otp_session import OtpSession
|
|
from app.workers.celery_app import celery_app
|
|
|
|
|
|
@celery_app.task(name="app.workers.tasks.security.cleanup_expired_otps")
|
|
def cleanup_expired_otps():
|
|
now = datetime.now(timezone.utc)
|
|
db = SessionLocal()
|
|
try:
|
|
total = db.query(OtpSession).count()
|
|
deleted = db.query(OtpSession).filter(OtpSession.expires_at <= now).delete(synchronize_session=False)
|
|
db.commit()
|
|
return {"checked": int(total), "deleted": int(deleted)}
|
|
except Exception:
|
|
db.rollback()
|
|
raise
|
|
finally:
|
|
db.close()
|