2025, Oct 31 03:47

Почему чтение через stream() внутри транзакции Firestore опасно

Почему stream() и нетранзакционные чтения внутри транзакции Firestore в Python опасны: потеря атомарности и лишние повторы. Разбор и безопасное решение с кодом.

Выполнять запросы Firestore вроде stream() внутри транзакции Python может казаться безобидным, когда «всё просто работает». Но есть скрытая ловушка: если такие чтения идут не через сам объект транзакции, вы лишаетесь ключевых гарантий транзакционности и рискуете сделать выполнение медленнее и растратить лишние ресурсы. Рекомендация избегать нетранзакционных чтений и записей внутри транзакции остаётся важной, даже если ваш код не падает.

Пример проблемы

Ниже показан фрагмент, который удаляет все существующие фрагменты markdown и записывает новые внутри транзакции. Список на удаление берётся из обычного stream() коллекции, который не использует переданный объект транзакции.

def write_md_chunks(txn: firestore.Transaction, note_doc: firestore.DocumentReference, md_text: str):
    md_coll = note_doc.collection('markdowns')
    # Удаляем существующие фрагменты (нетранзакционное чтение внутри транзакции)
    existing = md_coll.stream()
    for snap in existing:
        target = md_coll.document(snap.id)
        txn.delete(target)
        print("EXPECTED A CRASH, BUT IT DIDN'T. WHY?")
    pieces = helpers.split_text(md_text, CHUNK_LIMIT)
    for idx, part in enumerate(pieces):
        part_doc = md_coll.document()
        txn.set(part_doc, {
            'text': part,
            'order': idx + 1
        })
@http_fx.on_request(
    cors=cors_opts.CorsOptions(
        cors_origins=["*"],
        cors_methods=["POST"],
    )
)
def handle_md_edit(req: http_fx.Request) -> http_fx.Response:
    if req.method != 'POST':
        return http_fx.Response(
            json.dumps({"error": "Only POST requests are allowed"}),
            status=405
        )
    payload = req.get_json(silent=True)
    if not payload:
        return http_fx.Response(
            json.dumps({"error": "Invalid request data"}),
            status=400
        )
    user_id = payload['data']['uid']
    note_id = payload['data']['doc_id']
    next_md = payload['data']['markdown']
    if helpers.is_blank_or_none(next_md):
        return http_fx.Response(
            json.dumps({"error": "Invalid request data"}),
            status=400
        )
    if len(next_md) > 524288:
        return http_fx.Response(
            json.dumps({"error": "Invalid request data"}),
            status=400
        )
    db = firestore.client()
    now_ms = int(time.time() * 1000)
    try:
        @firestore.transactional
        def apply_note_update(txn):
            note_doc = (
                db.collection('users')
                .document(user_id)
                .collection('notes')
                .document(note_id)
            )
            current_md = md_tools.get_markdown(
                transaction=txn,
                note_ref=note_doc
            )
            if next_md != current_md:
                original_md = md_tools.get_original_markdown(
                    transaction=txn,
                    note_ref=note_doc
                )
                if next_md == original_md:
                    md_tools.delete_original_markdown(
                        transaction=txn,
                        note_ref=note_doc
                    )
                else:
                    md_tools.insert_original_markdown_if_not_exist(
                        transaction=txn,
                        note_ref=note_doc,
                        original_markdown=current_md
                    )
                md_tools.update_markdown(
                    transaction=txn,
                    note_ref=note_doc,
                    markdown=next_md
                )
                txn.update(
                    note_doc,
                    {
                        'modified_timestamp': now_ms,
                        'synced_timestamp': now_ms
                    }
                )
        txn = db.transaction()
        apply_note_update(txn)
        response = {
            "data": {
                "modified_timestamp": now_ms,
                "synced_timestamp": now_ms
            }
        }
        return http_fx.Response(
            json.dumps(response),
            status=200
        )
    except Exception as err:
        print(f"Error updating note markdown: {str(err)}")
        return http_fx.Response(
            json.dumps({"data": {"error": f"An error occurred: {str(err)}"}}),
            status=500
        )

В чём здесь настоящая проблема

Дело не в том, что stream() сам по себе «незаконен». Проблема в любых чтениях или записях базы, которые выполняются внутри тела транзакции, но при этом не используют переданный объект транзакции. Этого подхода стоит избегать по двум причинам. Во-первых, транзакции должны выполняться быстро, чтобы снижать конкуренцию. Запросы, которые транзакция не координирует, могут её замедлять. Во-вторых, при конфликте транзакция может перезапускаться несколько раз. Если вы читаете данные нетранзакционными вызовами внутри тела транзакции, эти чтения будут повторяться лишний раз. Вы также теряете выгоду от автоматических повторов, связанных с такими чтениями, что подрывает атомарность.

Поэтому вы и не увидели падения. У SDK нет надёжного и малозатратного способа на лету обнаружить, что вы смешали нетранзакционные чтения с активной транзакцией, особенно когда задействовано несколько потоков. Ответственность на вас: соблюдайте лучшие практики и держите транзакционную часть действительно транзакционной.

Как исправить

Есть два безопасных подхода. Либо читать данные через объект транзакции, чтобы изменения этих документов корректно приводили к повтору и сохраняли атомарность, либо полностью вынести такие чтения за пределы транзакции, чтобы не замедлять и не усложнять её выполнение.

В переработанном примере ниже вызов stream() вынесен из транзакции. Сама транзакция удаляет по ID и записывает новые фрагменты. Поведение в целом сохраняется, но нетранзакционное чтение больше не выполняется внутри тела транзакции.

def put_md_chunks_precomputed(txn: firestore.Transaction,
                              note_doc: firestore.DocumentReference,
                              md_text: str,
                              chunk_ids: list[str]):
    md_coll = note_doc.collection('markdowns')
    # Удаляем известные документы фрагментов внутри транзакции
    for cid in chunk_ids:
        txn.delete(md_coll.document(cid))
    parts = helpers.split_text(md_text, CHUNK_LIMIT)
    for idx, part in enumerate(parts):
        new_doc = md_coll.document()
        txn.set(new_doc, {
            'text': part,
            'order': idx + 1
        })
@http_fx.on_request(
    cors=cors_opts.CorsOptions(
        cors_origins=["*"],
        cors_methods=["POST"],
    )
)
def handle_md_edit(req: http_fx.Request) -> http_fx.Response:
    if req.method != 'POST':
        return http_fx.Response(
            json.dumps({"error": "Only POST requests are allowed"}),
            status=405
        )
    payload = req.get_json(silent=True)
    if not payload:
        return http_fx.Response(
            json.dumps({"error": "Invalid request data"}),
            status=400
        )
    user_id = payload['data']['uid']
    note_id = payload['data']['doc_id']
    next_md = payload['data']['markdown']
    if helpers.is_blank_or_none(next_md):
        return http_fx.Response(
            json.dumps({"error": "Invalid request data"}),
            status=400
        )
    if len(next_md) > 524288:
        return http_fx.Response(
            json.dumps({"error": "Invalid request data"}),
            status=400
        )
    db = firestore.client()
    now_ms = int(time.time() * 1000)
    # Предчтение вне транзакции
    note_doc = (
        db.collection('users')
        .document(user_id)
        .collection('notes')
        .document(note_id)
    )
    md_coll = note_doc.collection('markdowns')
    to_delete_ids = [snap.id for snap in md_coll.stream()]
    try:
        @firestore.transactional
        def apply_note_update(txn):
            # Для ясности каждый раз получаем тот же note_doc внутри
            nd = (
                db.collection('users')
                .document(user_id)
                .collection('notes')
                .document(note_id)
            )
            current_md = md_tools.get_markdown(transaction=txn, note_ref=nd)
            if next_md != current_md:
                original_md = md_tools.get_original_markdown(transaction=txn, note_ref=nd)
                if next_md == original_md:
                    md_tools.delete_original_markdown(transaction=txn, note_ref=nd)
                else:
                    md_tools.insert_original_markdown_if_not_exist(
                        transaction=txn,
                        note_ref=nd,
                        original_markdown=current_md
                    )
                # Заменяем фрагменты, используя заранее прочитанные ID
                put_md_chunks_precomputed(txn, nd, next_md, to_delete_ids)
                txn.update(nd, {
                    'modified_timestamp': now_ms,
                    'synced_timestamp': now_ms
                })
        txn = db.transaction()
        apply_note_update(txn)
        return http_fx.Response(
            json.dumps({
                "data": {
                    "modified_timestamp": now_ms,
                    "synced_timestamp": now_ms
                }
            }),
            status=200
        )
    except Exception as err:
        print(f"Error updating note markdown: {str(err)}")
        return http_fx.Response(
            json.dumps({"data": {"error": f"An error occurred: {str(err)}"}}),
            status=500
        )

Почему это стоит помнить

Транзакции могут перезапускаться, и их нужно держать «лёгкими». Чтение или запись в Firestore без использования объекта транзакции, пока вы находитесь внутри её тела, противоречит этим целям. Ошибки во время выполнения при этом не гарантированы, и у SDK нет практического способа автоматически запретить такой подход без накладных расходов. Это вопрос дисциплины. Сохраняйте транзакционный путь кода чистым и предсказуемым — тогда повторы и атомарность будут работать на вас.

Выводы

Если нужно читать данные, которые влияют на результат транзакции, делайте это через объект транзакции. Если требуется лишь список ID или иной контекст, не требующий координации транзакции, получите его до старта транзакции. Не считайте отсутствие падений признаком безопасности шаблона. Рекомендация остаётся неизменной: избегайте нетранзакционных чтений и записей Firestore внутри транзакции.

Статья основана на вопросе с StackOverflow от Cheok Yan Cheng и ответе Doug Stevenson.