2025, Oct 27 03:00
How to prevent SQLite 'database is locked' in FastAPI: move to async SQLAlchemy (aiosqlite) and enable WAL
Fix FastAPI + SQLite database is locked under concurrent writes with SQLAlchemy async (aiosqlite), enable WAL, and avoid blocking in async endpoints reliably
FastAPI on SQLite can stumble over concurrency. A classic symptom: nearly simultaneous requests hit two endpoints, one succeeds, the other dies on db.commit() with sqlite3.OperationalError: database is locked. When a writer holds the lock, a second writer cannot proceed, and if your application blocks the event loop with synchronous database calls, the error reproduces reliably. Below is a compact, reproducible setup and a practical way to mitigate the issue in a production-friendly manner.
Minimal setup that triggers the lock
The example uses two routes. The first inserts a row and waits before committing. The second nudges the lock earlier with a flush, waits longer, and then commits. Both run under an async path operation, but the database session itself is synchronous. Names are changed for clarity, the mechanics are unchanged.
import asyncio
import sqlite3
import threading
import time
import uuid
from loguru import logger
from fastapi import FastAPI, Depends
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
DB_URI = "sqlite:///./test_locked.db"
sync_engine = create_engine(DB_URI, connect_args={"check_same_thread": False})
SyncSessionMaker = sessionmaker(autocommit=False, autoflush=True, bind=sync_engine)
OrmBase = declarative_base()
app = FastAPI()
class Record(OrmBase):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
OrmBase.metadata.create_all(bind=sync_engine)
def acquire_sync_session() -> Session:
    dbh = SyncSessionMaker()
    try:
        yield dbh
    finally:
        dbh.close()
@app.post("/session_a")
async def run_a(db: Session = Depends(acquire_sync_session)):
    logger.info("A start")
    token = str(uuid.uuid4())
    row = Record(name=f"session_a{token}")
    db.add(row)
    await asyncio.sleep(0.5)
    logger.info(f"A commit {token}")
    db.commit()
    return {"status": "A committed"}
@app.post("/session_b")
async def run_b(db: Session = Depends(acquire_sync_session)):
    logger.info("B start")
    await asyncio.sleep(0.1)
    token = str(uuid.uuid4())
    row = Record(name=f"session_b{token}")
    db.add(row)
    db.flush()
    logger.info(f"B flush {token}")
    await asyncio.sleep(1)
    db.commit()
    logger.info(f"B commit {token}")
    return {"status": "B committed"}
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
Why the lock shows up
Sqlite enforces a single writer at a time. In this scenario, session B grabs the write lock early via flush, then keeps it while sleeping, while session A reaches commit and collides with the lock. There is another layer: the database calls here are synchronous. Even though the route functions are async, db.flush() and db.commit() block until they finish. In FastAPI, async path functions run inside the main event loop, while synchronous functions are moved to a threadpool. Keeping synchronous database I/O inside async endpoints blocks the event loop and aggravates timing and contention. The official FastAPI documentation has a detailed explainer on how async and sync interact in the framework and event loop.
Mitigation: make the database path truly async
Converting the database layer to use aiosqlite through SQLAlchemy’s async engine lets commit and flush yield control back to the event loop. This does not eliminate SQLite’s single-writer model, but it removes unnecessary blocking, smooths out scheduling, and, in practice, makes the app behave as expected under the described load pattern. Below is a directly usable version of the same application with asynchronous SQLAlchemy:
import asyncio
import uuid
import logging
from fastapi import FastAPI, Depends
from sqlalchemy import Column, Integer, String, text
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
log = logging.getLogger("uvicorn.error")
ASYNC_DB_URI = "sqlite+aiosqlite:///./test_locked.db"
async_engine = create_async_engine(ASYNC_DB_URI, connect_args={"check_same_thread": False})
AsyncSessionMaker = async_sessionmaker(autocommit=False, autoflush=True, bind=async_engine)
AsyncBase = declarative_base()
app = FastAPI()
class RowItem(AsyncBase):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
async def bootstrap_schema():
    async with async_engine.begin() as conn:
        await conn.run_sync(AsyncBase.metadata.drop_all)
        await conn.run_sync(AsyncBase.metadata.create_all)
async def get_async_session() -> AsyncSession:
    async with AsyncSessionMaker() as sess:
        yield sess
@app.post("/session_a")
async def route_a(db: AsyncSession = Depends(get_async_session)):
    log.info("A start")
    token = str(uuid.uuid4())
    row = RowItem(name=f"session_a{token}")
    db.add(row)
    await asyncio.sleep(0.5)
    log.info(f"A commit {token}")
    await db.commit()
    return {"status": "A committed"}
@app.post("/session_b")
async def route_b(db: AsyncSession = Depends(get_async_session)):
    log.info("B start")
    await asyncio.sleep(0.1)
    token = str(uuid.uuid4())
    row = RowItem(name=f"session_b{token}")
    db.add(row)
    await db.flush()
    log.info(f"B flush {token}")
    await asyncio.sleep(1)
    await db.commit()
    log.info(f"B commit {token}")
    return {"status": "B committed"}
if __name__ == "__main__":
    import uvicorn
    asyncio.run(bootstrap_schema())
    uvicorn.run(app, host="0.0.0.0", port=8000)
Running these endpoints concurrently produces the expected 200 responses without the “database is locked” failure. The key is that commit and flush are now awaitable and cooperate with the event loop.
Make SQLite friendlier to concurrency with WAL
Switching the journal mode to WAL (Write-Ahead Logging) can further help by improving read/write concurrency characteristics. You can toggle it at startup like this:
from sqlalchemy import text
async def activate_wal():
    async with async_engine.connect() as conn:
        res = await conn.execute(text("PRAGMA journal_mode=WAL;"))
        print("Journal mode:", res.scalar())
# run asyncio.run(activate_wal()) during initialization
Per the SQLite documentation, WAL is often a better fit for server-style access patterns because it is faster and offers more concurrency. It does not change the single-writer rule, but it is a practical tuning knob for this profile.
Why this matters for FastAPI and SQLite
In an async web stack, mixing synchronous database calls into async endpoints leads to blocking, brittle timing, and surprising lock contention. Using aiosqlite with SQLAlchemy’s async engine aligns the database operations with the event loop, removes unnecessary blocking, and reduces the chance of colliding commits under light to moderate concurrency. With WAL enabled and autoflush=True, low-concurrency scenarios are a good fit for this approach. For heavier, more robust concurrency requirements, moving to a different database technology such as Postgres is a reasonable direction. A pragmatic way to gauge headroom is to stand up the app and run a load test, then check whether a large number of requests maps to the same number of inserted records.
Takeaways
If you see “database is locked” under concurrent writes in FastAPI with SQLite, the fastest path to stability is to make the data layer asynchronous with aiosqlite and enable WAL. Keep the FastAPI async model in mind: async path functions run in the main event loop, while sync code is pushed to a threadpool. When you keep everything async, commit and flush can yield, and the scheduler will do the right thing more often. If you still hit lock limits, consider SQLite settings like WAL and a connection timeout and apply basic try/except around critical writes. If your workload grows beyond low concurrency, evaluate a database designed for multi-writer scenarios.
The article is based on a question from StackOverflow by lee and an answer by danielcahall.