2025, Dec 01 06:02

Kafka-консьюмер пропускает сообщение при рестарте: как это исправить

Почему при рестарте Kafka-консьюмер пропускает сообщение и как это исправить: коммит смещений (TopicPartition/OffsetAndMetadata), стабильный group.id, KRaft.

Перезапуски Kafka-консьюмера, из-за которых незаметно пропускается одно сообщение в полёте, особенно раздражают, когда кластер исправен, а продьюсер подтверждает, что топик жив. В KRaft-конфигурации с тремя брокерами и тремя контроллерами на confluentinc/cp-kafka:7.8.0 продьюсер отправляет m1–m5, консьюмер обрабатывает m1–m3 и уходит офлайн; после возврата m4 так и не приходит, тогда как m5 появляется. Разберёмся, что происходит и как обеспечить устойчивое потребление при рестартах без постоянной смены group.id.

Минимальный пример, воспроизводящий проблему

def consume_source_attrs_loop():
    client = None
    backoff_tries = 0
    stream_name = 'eys_device_wins_updated_defSourceAttrs'
    try:
        while True:
            if client is None:
                try:
                    suffix_id = uuid.uuid4().hex[:4]
                    logger.info("Creating new Kafka consumer...")
                    client = BrokerLink(
                        group_id="wins-consumer-group-cost-service-01",
                        offset_reset="latest",
                    )

                    if not client.topic_exists(stream_name):
                        client.add_topic(stream_name)

                    client = client.create_consumer()
                    client.subscribe([stream_name])
                    logger.info(f"Subscribed to Kafka topic: {stream_name}")
                except Exception as connect_exc:
                    ops

            evt = client.poll(1.0)

            if evt is None:
                continue

            if evt.error():
                signals

            body_bytes = evt.value()
            if body_bytes is None:
                logger.warning(f"Received message with None value. Message key: {evt.key()}")
                continue

            try:
                text_body = body_bytes.decode('utf-8')
            except Exception as decode_exc:
                logger.error(f"Failed to decode message: {decode_exc} - raw value: {body_bytes}")
                continue

            try:
                parsed_json = json.loads(text_body)
                client.commit(message=evt)
            except json.JSONDecodeError as parse_exc:
                logger.error(f"Failed to parse JSON: {parse_exc} - message: {text_body}")
                continue

            if parsed_json is not None:
                try:
                    process_attrs_payload(parsed_json)
                except Exception as biz_exc:
                    logger.error(f"Error in business logic: {biz_exc} - message: {parsed_json}")

    except Exception as fatal_exc:
        logger.exception(f"consume_source_attrs_loop crashed with: {fatal_exc}")
        backoff_tries += 1
        delay_s = min(60, 2 ** backoff_tries)
        logger.warning(f"Retrying consumer in {delay_s} seconds...")
        time.sleep(delay_s)
    finally:
        backoff_tries = 0
        if client is not None:
            client.commit()
            client.close()
            logger.info("Kafka consumer closed.")

Что здесь на самом деле происходит

Такое поведение не является нормой. Корень проблемы — в управлении смещениями (offsets). Важны два момента. Во-первых, API коммита вызывается так, что фактическое смещение, которое вы хотите сохранить, не задаётся. Верная форма ожидает отображение разделов на смещения, то есть {TopicPartition: OffsetAndMetadata}. Коммит сообщения через несовместную сигнатуру не фиксирует нужную позицию, и группа может начать следующую сессию с неподходящего смещения. Во-вторых, полагаться на коммит в финальном блоке — хрупко: он не гарантирован к исполнению при завершении по сигналам SIGTERM или SIGINT. В таких условиях финальное смещение может так и не записаться.

Есть подтверждающая зацепка из эксперимента с конфигурацией. Переход на новый group.id и offset_reset='earliest' делает m4 видимым. Эта комбинация сбрасывает ранее сохранённую позицию и стартует для этой группы с начала доступных данных, поэтому пропущенное сообщение появляется. Это полезная диагностика, но не стратегия для продакшена, если нужна долговременная фиксация состояния.

Исправление: коммитим нужную позицию и не полагаемся на хрупкий путь завершения

Сохраните текущий group.id — так обеспечивается устойчивость. Убедитесь, что вызов commit передаёт явный раздел и следующее смещение, которое надо сохранить. И относитесь к финализатору как к best-effort: надёжное место для фиксации прогресса — сразу после успешной обработки события.

def consume_source_attrs_loop():
    client = None
    backoff_tries = 0
    stream_name = 'eys_device_wins_updated_defSourceAttrs'
    try:
        while True:
            if client is None:
                try:
                    suffix_id = uuid.uuid4().hex[:4]
                    logger.info("Creating new Kafka consumer...")
                    client = BrokerLink(
                        group_id="wins-consumer-group-cost-service-01",
                        offset_reset="latest",
                    )

                    if not client.topic_exists(stream_name):
                        client.add_topic(stream_name)

                    client = client.create_consumer()
                    client.subscribe([stream_name])
                    logger.info(f"Subscribed to Kafka topic: {stream_name}")
                except Exception as connect_exc:
                    ops

            evt = client.poll(1.0)
            if evt is None:
                continue

            if evt.error():
                signals

            body_bytes = evt.value()
            if body_bytes is None:
                logger.warning(f"Received message with None value. Message key: {evt.key()}")
                continue

            try:
                text_body = body_bytes.decode('utf-8')
            except Exception as decode_exc:
                logger.error(f"Failed to decode message: {decode_exc} - raw value: {body_bytes}")
                continue

            try:
                parsed_json = json.loads(text_body)

                # Зафиксируйте точную позицию: {TopicPartition: OffsetAndMetadata}
                tp = TopicPartition(evt.topic(), evt.partition())
                om = OffsetAndMetadata(evt.offset() + 1, None)
                client.commit({tp: om})

            except json.JSONDecodeError as parse_exc:
                logger.error(f"Failed to parse JSON: {parse_exc} - message: {text_body}")
                continue

            if parsed_json is not None:
                try:
                    process_attrs_payload(parsed_json)
                except Exception as biz_exc:
                    logger.error(f"Error in business logic: {biz_exc} - message: {parsed_json}")

    except Exception as fatal_exc:
        logger.exception(f"consume_source_attrs_loop crashed with: {fatal_exc}")
        backoff_tries += 1
        delay_s = min(60, 2 ** backoff_tries)
        logger.warning(f"Retrying consumer in {delay_s} seconds...")
        time.sleep(delay_s)
    finally:
        backoff_tries = 0
        if client is not None:
            # Это best-effort и может не выполниться при SIGTERM/SIGINT
            client.commit()
            client.close()
            logger.info("Kafka consumer closed.")

Чтобы убедиться, что смещения действительно там, где вы их ожидаете, посмотрите состояние группы стандартным скриптом и сравните зафиксированные смещения с конечными (end offsets) по каждому разделу.

kafka-consumer-groups.sh --bootstrap-server broker1:30903,broker2:30448,broker3:30805 \
  --describe --group wins-consumer-group-cost-service-01

Почему это важно

Надёжность потребления держится на корректном учёте смещений. Если сохранённая позиция неверна или вовсе не сохранена, перезапущенный консьюмер либо пропустит данные, либо обработает их повторно. В зависимости от трафика это может выглядеть как исчезновение одного сообщения в периоде простоя сервиса. Коммит точного следующего смещения на корректном TopicPartition и отказ от надежд на финализаторы при завершении делают рестарты предсказуемыми.

Итоги

Это не «нормальное» поведение Kafka. Симптом указывает на обработку смещений, а не на проблемы брокеров или KRaft. Используйте стабильный consumer group.id. Значение offset.reset подбирайте под сценарий восстановления, но не полагайтесь на него при обычных рестартах. После успешной обработки коммитьте правильную структуру {TopicPartition: OffsetAndMetadata}. Не рассчитывайте на коммиты в момент очистки: при завершении по сигналам они могут не выполниться. Если сомневаетесь, опишите состояние группы консьюмера и проверьте, что именно зафиксировано, прежде чем поднимать сервис.