From dc1fadd9282b8f9c6e4717e877c1f9496d8c48e4 Mon Sep 17 00:00:00 2001
From: TronoSfera <119615520+TronoSfera@users.noreply.github.com>
Date: Mon, 19 Jan 2026 11:26:11 +0300
Subject: [PATCH] Add admin login page for web UI
---
server/auth.py | 15 ++++++---
server/main.py | 63 +++++++++++++++++++++++++++++++++++--
server/templates/base.html | 9 +++++-
server/templates/login.html | 23 ++++++++++++++
4 files changed, 102 insertions(+), 8 deletions(-)
create mode 100644 server/templates/login.html
diff --git a/server/auth.py b/server/auth.py
index 0ddd3c7..e95b081 100644
--- a/server/auth.py
+++ b/server/auth.py
@@ -19,7 +19,7 @@ from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
-from fastapi import Depends, HTTPException, status
+from fastapi import Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
@@ -36,7 +36,7 @@ ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "60")
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
-oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/login")
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/login", auto_error=False)
def hash_password(password: str) -> str:
@@ -58,7 +58,9 @@ def create_access_token(data: dict, expires_delta: Optional[datetime.timedelta]
async def get_current_user(
- token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)
+ request: Request,
+ token: str | None = Depends(oauth2_scheme),
+ db: Session = Depends(get_db),
) -> models.User:
"""Retrieve the current user from a JWT token."""
credentials_exception = HTTPException(
@@ -66,8 +68,11 @@ async def get_current_user(
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
+ raw_token = token or request.cookies.get("access_token")
+ if not raw_token:
+ raise credentials_exception
try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
+ payload = jwt.decode(raw_token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: int | None = payload.get("sub")
if user_id is None:
raise credentials_exception
@@ -93,4 +98,4 @@ async def get_current_admin(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough privileges",
)
- return current_user
\ No newline at end of file
+ return current_user
diff --git a/server/main.py b/server/main.py
index 31b2337..fba077f 100644
--- a/server/main.py
+++ b/server/main.py
@@ -37,9 +37,10 @@ from fastapi import (
Request,
Response,
)
-from fastapi.responses import HTMLResponse, FileResponse
+from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.security import OAuth2PasswordRequestForm
+from fastapi.exception_handlers import http_exception_handler
from sqlalchemy.orm import Session
from . import models, schemas, auth, storage as storage_module, database
@@ -50,6 +51,17 @@ templates = Jinja2Templates(directory=os.path.join(os.path.dirname(__file__), "t
storage = storage_module.get_storage()
+@app.exception_handler(HTTPException)
+async def custom_http_exception_handler(request: Request, exc: HTTPException) -> Response:
+ if (
+ exc.status_code == status.HTTP_401_UNAUTHORIZED
+ and "text/html" in request.headers.get("accept", "")
+ and not request.url.path.startswith("/api")
+ ):
+ return RedirectResponse(url="/login")
+ return await http_exception_handler(request, exc)
+
+
@app.on_event("startup")
async def on_startup() -> None:
# Create database tables if they do not exist
@@ -649,6 +661,53 @@ async def update_client_config(
# ======== Web interface routes =========
+@app.get("/login", response_class=HTMLResponse)
+async def login_page(request: Request) -> Response:
+ return templates.TemplateResponse(
+ "login.html",
+ {
+ "request": request,
+ "title": "Admin Login",
+ },
+ )
+
+
+@app.post("/login", response_class=HTMLResponse)
+async def login_submit(
+ request: Request,
+ username: str = Form(...),
+ password: str = Form(...),
+ db: Session = Depends(database.get_db),
+) -> Response:
+ user = db.query(models.User).filter(models.User.username == username).first()
+ if not user or not auth.verify_password(password, user.hashed_password):
+ return templates.TemplateResponse(
+ "login.html",
+ {
+ "request": request,
+ "title": "Admin Login",
+ "error": "Invalid username or password.",
+ },
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ )
+ access_token = auth.create_access_token(data={"sub": user.id, "is_admin": user.is_admin})
+ response = RedirectResponse(url="/clients", status_code=status.HTTP_303_SEE_OTHER)
+ response.set_cookie(
+ "access_token",
+ access_token,
+ httponly=True,
+ samesite="lax",
+ )
+ return response
+
+
+@app.get("/logout")
+async def logout() -> Response:
+ response = RedirectResponse(url="/login", status_code=status.HTTP_303_SEE_OTHER)
+ response.delete_cookie("access_token")
+ return response
+
+
@app.get("/clients", response_class=HTMLResponse)
async def list_clients_page(
request: Request,
@@ -745,4 +804,4 @@ async def root_redirect(request: Request) -> Response:
redirect keeps the root endpoint simple and ensures there is no
ambiguity with multiple handlers for ``/``.
"""
- return Response(status_code=303, headers={"Location": "/clients"})
\ No newline at end of file
+ return Response(status_code=303, headers={"Location": "/clients"})
diff --git a/server/templates/base.html b/server/templates/base.html
index baca42b..1df06b5 100644
--- a/server/templates/base.html
+++ b/server/templates/base.html
@@ -12,10 +12,17 @@
{% block content %}{% endblock %}