2025, Dec 06 09:03

Как защитить маршруты в FastAPI: Depends, APIRouter и OAuth2

Почему маршруты FastAPI остаются открытыми и как закрыть их с помощью зависимостей: Depends на эндпоинте или APIRouter, примеры и советы по защите и OAuth2.

Если вы подключили аутентификацию к проекту на FastAPI, но конечные точки по-прежнему отвечают без проверок, чаще всего причина в том, что на этих обработчиках не подключена зависимость. Декларации бэкенда безопасности или выдачи куки/токенов недостаточно: фреймворк применяет проверку только там, где зависимость явно указана — на конкретном обработчике или на уровне роутера.

Незащищённые маршруты

Ниже — упрощённое приложение: предполагалось защитить только маршруты, связанные с файлами, однако сейчас они доступны без входа. Приложение действительно выдаёт токен в cookie при регистрации/логине, но ни один файловый эндпоинт не требует зависимости, которая удостоверяет пользователя. Поэтому всё остаётся открытым.

import uvicorn
from fastapi import FastAPI, HTTPException, Response, Depends, UploadFile
from fastapi.responses import StreamingResponse, FileResponse
from pydantic import BaseModel, Field
from typing import Annotated, List
from authx import AuthX, AuthXConfig
from sqlalchemy import select
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from passlib.context import CryptContext


# Хеширование паролей
pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")


# Инициализация приложения FastAPI и сессии БД
api = FastAPI()  # Экземпляр приложения FastAPI
db_engine = create_async_engine("sqlite+aiosqlite:///users.db", echo=True)  # Асинхронный движок БД
session_factory = async_sessionmaker(db_engine, expire_on_commit=False)  # Фабрика асинхронных сессий


async def acquire_db():
    "Access DB session"
    async with session_factory() as s:
        yield s


DbSessionDep = Annotated[AsyncSession, Depends(acquire_db)]  # Зависимость для сессии БД


class OrmBase(DeclarativeBase):
    pass


class AccountRow(OrmBase):  # Таблица пользователей
    __tablename__ = "users"

    # Колонки
    uid: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, nullable=False)
    username: Mapped[str] = mapped_column(unique=True, index=True)
    password: Mapped[str]


class AccountIn(BaseModel):
    username: str = Field(max_length=10)
    password: str = Field(min_length=8, max_length=16)


# Конфигурация AuthX
jwt_conf = AuthXConfig()
jwt_conf.JWT_SECRET_KEY = "kirieshki"
jwt_conf.JWT_ACCESS_COOKIE_NAME = "access_token"
jwt_conf.JWT_TOKEN_LOCATION = ['cookies']
authx_mgr = AuthX(config=jwt_conf)


@api.post("/register",
          tags=["Авторизация"],
          summary="Зарегистрировать аккаунт")
async def register_account(creds: AccountIn,
                           response: Response,
                           session: DbSessionDep):
    "User registration"
    check_stmt = select(AccountRow).where(AccountRow.username == creds.username)
    res = await session.execute(check_stmt)
    if res.scalar_one_or_none():
        raise HTTPException(status_code=400, detail="User already exist")

    hashed = pwd_ctx.hash(creds.password)

    row = AccountRow(username=creds.username, password=hashed)
    session.add(row)
    await session.commit()
    await session.refresh(row)

    token = authx_mgr.create_access_token(uid=str(row.uid))
    authx_mgr.set_access_cookies(token, response)

    return {"message": "User has been registered"}


@api.post("/login",
          tags=["Авторизация"],
          summary="Войти в аккаунт")
async def login_account(creds: AccountIn,
                        response: Response,
                        session: DbSessionDep):
    "User login"
    check_stmt = select(AccountRow).where(AccountRow.username == creds.username)
    res = await session.execute(check_stmt)
    user = res.scalar_one_or_none()

    if not user or not pwd_ctx.verify(creds.password, user.password):
        raise HTTPException(status_code=401, detail="Invalid username or password")

    token = authx_mgr.create_access_token(uid=str(user.uid))
    authx_mgr.set_access_cookies(token, response)

    return {"Success": True}


@api.post("/reset",
          tags=["Работа с БД"],
          summary="Сброс БД")
async def reset_db():
    "Reset DB"
    async with db_engine.begin() as conn:
        await conn.run_sync(OrmBase.metadata.drop_all)
        await conn.run_sync(OrmBase.metadata.create_all)
    return {"База данных успешно сброшена": True}


stored_files = []  # Хранилище загружённых файлов в памяти


@api.get("/get_filenames",
         tags=["Получение файлов"],
         summary="Получить названия файлов")
async def list_filenames():
    if len(stored_files) == 0:
        return {"File list is empty"}
    else:
        return stored_files


def stream_chunks(filename: str):
    "Chunked file streaming"
    with open(filename, "rb") as file:
        while chunk := file.read(1024 * 1024):
            yield chunk


@api.get("/get_files",
         tags=["Получение файлов"],
         summary="Получить файл")
async def fetch_file(filename: str):
    return FileResponse(filename)


@api.get("/streaming/{filename}",
         tags=["Получение файлов"],
         summary="Получение файла в стриминге")
async def stream_file(filename: str):
    return StreamingResponse(stream_chunks(filename))


@api.post("/upload",
          tags=["Добавление файлов"],
          summary="Загрузка файла")
async def upload_single(uploaded_file: UploadFile):
    "Upload single file"
    file = uploaded_file.file
    filename = uploaded_file.filename
    with open(f"1_{filename}", "wb") as f:
        f.write(file.read())
    stored_files.append(f"1_{filename}")
    return {"File was uploaded": True}


@api.post("/upload_multiple",
          tags=["Добавление файлов"],
          summary="Загрузка нескольких файлов")
async def upload_many(uploaded_files: list[UploadFile]):
    for uploaded_file in upload_many:
        file = uploaded_file.file
        filename = uploaded_file.filename
        with open(f"1_{filename}", "wb") as f:
            f.write(file.read())
        stored_files.append(f"1_{filename}")
    return {"Multiple files were uploaded": True}


if __name__ == "__main__":
    uvicorn.run("main:api", host="127.0.0.1", port=8000, reload=True)

Почему маршруты остаются открытыми

FastAPI не защищает маршруты автоматически только потому, что создаётся токен или настроен бэкенд безопасности. Защита работает лишь там, где вы внедряете зависимость, выполняющую аутентификацию. Если обработчик не зависит от такой зависимости, он остаётся публичным. В приведённом коде ни один маршрут из блока с файлами не обёрнут в Depends, поэтому любой запрос проходит без ограничений.

Два простых способа защитить маршруты

Первый вариант — добавить зависимость безопасности к каждому эндпоинту, который должен быть закрыт. Это самый явный подход: просто, прозрачно и легко проверять. Для примера ниже в роли защиты используется HTTPBasic, прикреплённый непосредственно к одному маршруту. Логика в том, что обработчик не запустится, пока зависимость не будет успешно разрешена.

from fastapi import FastAPI, Depends
from fastapi.security import HTTPBasic, HTTPBasicCredentials

svc = FastAPI()
basic_guard = HTTPBasic()


@svc.get("/")
async def index():
    return "OK"


@svc.get("/protected")
async def protected(creds: HTTPBasicCredentials = Depends(basic_guard)):
    return {"username": creds.username, "password": creds.password}

Второй вариант — сгруппировать закрытые эндпоинты в отдельный APIRouter и один раз повесить на него зависимость. Тогда любой маршрут, зарегистрированный на этом роутере, автоматически становится защищённым. Такой приём хорошо масштабируется и соответствует модульной организации роутеров в крупных приложениях.

Рабочий пример с защищённым роутером

Ниже показана рабочая схема: проверка учётных данных, сохранение аутентифицированного пользователя в request.state и одновременное предоставление открытых и закрытых маршрутов. Защищённый роутер создаётся с зависимостью, которую нужно пройти для каждого подключённого к нему эндпоинта. Тестовые данные: имя пользователя — admin, пароль — password.

from fastapi import FastAPI, APIRouter, Depends, Request, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import hashlib
from secrets import compare_digest
from pydantic import BaseModel

app2 = FastAPI()
auth_scheme = HTTPBasic()


demo_users = {
    "admin": {
        "username": "admin",
        "full_name": "Test Admin",
        "email": "admin@example.com",
        "hashed_password": "c255f02d5cb0812495fcf301d3239f80693f349d6b423a4f1868997c6b211eda",  # Пароль в открытом виде: password
        "salt": "fc96c333c19cfbf9f99b916c0dc82db1e2f8d88fa72e5f534147e25a19341fa3",
        "disabled": False,
        "priviliged": True,
    }
}


class UserOut(BaseModel):
    username: str
    full_name: str
    email: str
    disabled: bool
    priviliged: bool


class UserRecord(UserOut):
    hashed_password: str
    salt: str


# Захешировать новый открытый пароль с солью; возвращает хеш и соль
def get_password_hash_and_salt(plain_password: str):
    salt = secrets.token_hex(32)
    hashed_password = hashlib.pbkdf2_hmac('sha256', plain_password.encode('utf-8'), salt.encode('utf-8'), 100000)
    return hashed_password.hex(), salt


# Проверить открытый пароль по сохранённым хешу и соли
def verify_password(plain_password: str, user: UserRecord):
    derived = hashlib.pbkdf2_hmac('sha256', plain_password.encode('utf-8'), user.salt.encode('utf-8'), 100000)
    current_bytes = derived.hex().encode('utf-8')
    correct_bytes = user.hashed_password.encode('utf-8')
    return compare_digest(current_bytes, correct_bytes)


def fetch_user(username: str):
    user = demo_users.get(username)
    if user:
        return UserRecord(**user)


def assert_credentials(request: Request, creds: HTTPBasicCredentials = Depends(auth_scheme)):
    user = fetch_user(creds.username)
    if user and verify_password(creds.password, user):
        request.state.user = UserOut(**user.model_dump())
    else:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Basic"},
        )


open_router = APIRouter()
locked_router = APIRouter(dependencies=[Depends(assert_credentials)])


@open_router.get("/")
async def main_open():
    return "OK"


@locked_router.get("/protected")
async def main_locked(request: Request):
    return request.state.user


app2.include_router(open_router)
app2.include_router(locked_router)

Обратите внимание на использование request.state: так объект пользователя остаётся доступен в эндпоинтах, когда зависимость навешана на сам роутер. Возвращаемое значение зависимости, заданной на уровне роутера, не попадёт в параметры обработчика; запись в request.state делает данные доступными в течение текущего жизненного цикла запроса. Как отмечается в источниках, на которые ссылалась первоначальная заметка, состояние, доступное через request, — это неглубокая копия состояния lifespan, поэтому данные, помещённые в request.state, видны только в рамках одного запроса.

Получение данных напрямую из зависимости

Если удобнее передавать результат зависимости прямо в обработчик, минуя request.state, объявляйте зависимость на самом эндпоинте. Придётся повторить Depends для каждого закрытого маршрута, зато сигнатуры обработчиков останутся явно описанными.

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from pydantic import BaseModel
from secrets import compare_digest
import hashlib

api3 = FastAPI()
auth_basic = HTTPBasic()


class UserPublic(BaseModel):
    username: str
    full_name: str
    email: str
    disabled: bool
    priviliged: bool


# ... используйте те же demo_users, verify_password и т. п.

def verify_creds(creds: HTTPBasicCredentials = Depends(auth_basic)):
    # Предполагается, что fetch_user и verify_password такие же, как в предыдущем примере
    user = fetch_user(creds.username)
    if user and verify_password(creds.password, user):
        return UserPublic(**user.model_dump())
    else:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Basic"},
        )


@api3.get("/protected")
async def protected_here(user: UserPublic = Depends(verify_creds)):
    return user

Вариант с OAuth2 (токены)

Если вам ближе токен-ориентированные схемы, можно перейти на OAuth2. Ниже показано, как прикрепить OAuth2PasswordBearer как зависимость к защищённому роутеру. Это лишь набросок; сам эндпоинт выдачи токена и вся связка аутентификации должны соответствовать официальному учебнику.

from fastapi import FastAPI, APIRouter, Depends
from fastapi.security import OAuth2PasswordBearer

app4 = FastAPI()
oauth2 = OAuth2PasswordBearer(tokenUrl="token")
public_router = APIRouter()
secure_router = APIRouter(dependencies=[Depends(oauth2)])


@public_router.get("/")
async def alive():
    return "OK"


@secure_router.get("/protected")
async def show_token(token: str = Depends(oauth2)):
    return {"token": token}


app4.include_router(public_router)
app4.include_router(secure_router)

Почему это важно знать

В FastAPI безопасность — составная и настраиваемая. Но вместе с гибкостью приходит явное требование: подключайте зависимость туда, где хотите добиться принудительной проверки. Глобальная настройка без привязки к эндпоинтам создаёт иллюзию защиты и оставляет маршруты открытыми. Организация защиты на уровне роутеров даёт чистое разделение открытой и закрытой зон, а зависимости на уровне обработчиков делают контракт каждого маршрута очевидным.

Итоги

Если маршрут должен быть закрыт — внедрите зависимость безопасности. Делайте это на уровне эндпоинта для максимальной ясности или на уровне APIRouter — для удобной группировки и масштабирования. Когда зависимость расположена на роутере, передавайте данные в обработчики через request.state; при зависимостях на уровне эндпоинта просто возвращайте объект пользователя из зависимости и принимайте его как параметр. Независимо от того, используете ли вы HTTPBasic для демонстраций или OAuth2 для токенов, принцип один: указывайте Depends именно там, где нужна аутентификация.