2025, Oct 31 13:47
Как устранить database is locked в FastAPI и SQLite с aiosqlite и WAL
Разбираем причину ошибки database is locked в FastAPI c SQLite и показываем решение: асинхронный SQLAlchemy на aiosqlite и включение WAL для стабильных записей
FastAPI на SQLite может споткнуться о параллельность. Классический симптом: почти одновременно прилетают запросы к двум эндпоинтам — один проходит, второй падает на db.commit() с sqlite3.OperationalError: database is locked. Пока один писатель держит блокировку, второй продолжить не может, и если приложение блокирует событийный цикл синхронными вызовами к базе, ошибка воспроизводится стабильно. Ниже — компактный, воспроизводимый пример и практичный способ смягчить проблему так, чтобы это подошло продакшену.
Минимальная конфигурация, которая вызывает блокировку
Пример использует два маршрута. Первый вставляет запись и делает паузу перед коммитом. Второй заранее «подталкивает» блокировку через flush, ждёт дольше и затем коммитит. Оба обработчика объявлены как асинхронные, но сама сессия базы данных — синхронная. Имена немного упрощены для ясности; механика прежняя.
import asyncio
import sqlite3
import threading
import time
import uuid
from loguru import logger
from fastapi import FastAPI, Depends
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
DB_URI = "sqlite:///./test_locked.db"
sync_engine = create_engine(DB_URI, connect_args={"check_same_thread": False})
SyncSessionMaker = sessionmaker(autocommit=False, autoflush=True, bind=sync_engine)
OrmBase = declarative_base()
app = FastAPI()
class Record(OrmBase):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
OrmBase.metadata.create_all(bind=sync_engine)
def acquire_sync_session() -> Session:
    dbh = SyncSessionMaker()
    try:
        yield dbh
    finally:
        dbh.close()
@app.post("/session_a")
async def run_a(db: Session = Depends(acquire_sync_session)):
    logger.info("A start")
    token = str(uuid.uuid4())
    row = Record(name=f"session_a{token}")
    db.add(row)
    await asyncio.sleep(0.5)
    logger.info(f"A commit {token}")
    db.commit()
    return {"status": "A committed"}
@app.post("/session_b")
async def run_b(db: Session = Depends(acquire_sync_session)):
    logger.info("B start")
    await asyncio.sleep(0.1)
    token = str(uuid.uuid4())
    row = Record(name=f"session_b{token}")
    db.add(row)
    db.flush()
    logger.info(f"B flush {token}")
    await asyncio.sleep(1)
    db.commit()
    logger.info(f"B commit {token}")
    return {"status": "B committed"}
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
Почему появляется блокировка
SQLite допускает только одного писателя одновременно. В нашей ситуации сессия B рано получает блокировку на запись через flush и удерживает её во время ожидания; в это время сессия A доходит до commit и сталкивается с блокировкой. Есть и второй пласт: вызовы к базе здесь синхронные. Несмотря на то что обработчики — async, db.flush() и db.commit() блокируют поток до завершения. В FastAPI асинхронные хендлеры выполняются в главном событийном цикле, а синхронные функции уезжают в пул потоков. Если внутри async-эндпоинтов оставлять синхронный I/O базы, это блокирует event loop и усугубляет проблемы тайминга и конкуренции. В официальной документации FastAPI есть подробное объяснение того, как async и sync взаимодействуют в рамках фреймворка и событийного цикла.
Как смягчить проблему: сделать доступ к базе по‑настоящему асинхронным
Переведите слой доступа к базе на aiosqlite через асинхронный движок SQLAlchemy — тогда commit и flush будут отдавать управление обратно событийному циклу. Это не отменяет модель «единственного писателя» в SQLite, но убирает лишние блокировки, сглаживает планирование и на практике делает поведение приложения предсказуемым при описанной нагрузке. Ниже — готовая к использованию версия того же приложения на асинхронном SQLAlchemy:
import asyncio
import uuid
import logging
from fastapi import FastAPI, Depends
from sqlalchemy import Column, Integer, String, text
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
log = logging.getLogger("uvicorn.error")
ASYNC_DB_URI = "sqlite+aiosqlite:///./test_locked.db"
async_engine = create_async_engine(ASYNC_DB_URI, connect_args={"check_same_thread": False})
AsyncSessionMaker = async_sessionmaker(autocommit=False, autoflush=True, bind=async_engine)
AsyncBase = declarative_base()
app = FastAPI()
class RowItem(AsyncBase):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
async def bootstrap_schema():
    async with async_engine.begin() as conn:
        await conn.run_sync(AsyncBase.metadata.drop_all)
        await conn.run_sync(AsyncBase.metadata.create_all)
async def get_async_session() -> AsyncSession:
    async with AsyncSessionMaker() as sess:
        yield sess
@app.post("/session_a")
async def route_a(db: AsyncSession = Depends(get_async_session)):
    log.info("A start")
    token = str(uuid.uuid4())
    row = RowItem(name=f"session_a{token}")
    db.add(row)
    await asyncio.sleep(0.5)
    log.info(f"A commit {token}")
    await db.commit()
    return {"status": "A committed"}
@app.post("/session_b")
async def route_b(db: AsyncSession = Depends(get_async_session)):
    log.info("B start")
    await asyncio.sleep(0.1)
    token = str(uuid.uuid4())
    row = RowItem(name=f"session_b{token}")
    db.add(row)
    await db.flush()
    log.info(f"B flush {token}")
    await asyncio.sleep(1)
    await db.commit()
    log.info(f"B commit {token}")
    return {"status": "B committed"}
if __name__ == "__main__":
    import uvicorn
    asyncio.run(bootstrap_schema())
    uvicorn.run(app, host="0.0.0.0", port=8000)
Параллельный запуск этих эндпоинтов даёт ожидаемые ответы 200 — без падений с “database is locked”. Ключевой момент: commit и flush теперь awaitable и работают в кооперации с event loop.
Сделайте SQLite дружелюбнее к конкуренции через WAL
Переключение режима журнала в WAL (Write-Ahead Logging) дополнительно помогает, улучшая совместимость чтения/записи при одновременном доступе. Включить его на старте можно так:
from sqlalchemy import text
async def activate_wal():
    async with async_engine.connect() as conn:
        res = await conn.execute(text("PRAGMA journal_mode=WAL;"))
        print("Journal mode:", res.scalar())
# вызовите asyncio.run(activate_wal()) во время инициализации
Согласно документации SQLite, WAL часто лучше подходит для серверных паттернов доступа: он быстрее и обеспечивает больше параллельности. Правило «единственного писателя» это не отменяет, но в данном профиле — полезный рычаг настройки.
Почему это важно для FastAPI и SQLite
В асинхронном веб-стеке смешивание синхронных вызовов к БД внутри async-эндпоинтов ведёт к блокировкам, хрупким таймингам и неожиданным конфликтам за блокировки. Связка aiosqlite и асинхронного движка SQLAlchemy выстраивает работу БД в одной плоскости с event loop, убирает лишние стопоры и снижает риск столкновений коммитов при небольшой и средней конкуренции. С включённым WAL и autoflush=True такой подход хорошо работает в сценариях с невысокой одновременностью. Для более тяжёлых и надёжных требований к параллельной записи разумно рассмотреть другую СУБД, например Postgres. Прагматичный способ оценить запас — поднять приложение, прогнать нагрузочный тест и проверить, соответствует ли количество вставленных записей числу запросов.
Итоги
Если при одновременных записях в FastAPI с SQLite вы ловите “database is locked”, самый быстрый путь к стабильности — перевести слой данных на aiosqlite и включить WAL. Помните модель FastAPI: асинхронные обработчики работают в основном event loop, а синхронный код уезжает в пул потоков. Когда всё остаётся асинхронным, commit и flush могут уступать управление, и планировщик чаще поступает правильно. Если блокировки всё ещё упираются в пределы, настройте SQLite — тот же WAL и таймаут соединения — и оберните критические записи в простой try/except. А если нагрузка выходит за рамки низкой конкуренции, присмотритесь к СУБД, рассчитанным на многописательные сценарии.
Статья основана на вопросе на StackOverflow от lee и ответе от danielcahall.