2025, Dec 01 06:02
Kafka-консьюмер пропускает сообщение при рестарте: как это исправить
Почему при рестарте Kafka-консьюмер пропускает сообщение и как это исправить: коммит смещений (TopicPartition/OffsetAndMetadata), стабильный group.id, KRaft.
Перезапуски Kafka-консьюмера, из-за которых незаметно пропускается одно сообщение в полёте, особенно раздражают, когда кластер исправен, а продьюсер подтверждает, что топик жив. В KRaft-конфигурации с тремя брокерами и тремя контроллерами на confluentinc/cp-kafka:7.8.0 продьюсер отправляет m1–m5, консьюмер обрабатывает m1–m3 и уходит офлайн; после возврата m4 так и не приходит, тогда как m5 появляется. Разберёмся, что происходит и как обеспечить устойчивое потребление при рестартах без постоянной смены group.id.
Минимальный пример, воспроизводящий проблему
def consume_source_attrs_loop():
client = None
backoff_tries = 0
stream_name = 'eys_device_wins_updated_defSourceAttrs'
try:
while True:
if client is None:
try:
suffix_id = uuid.uuid4().hex[:4]
logger.info("Creating new Kafka consumer...")
client = BrokerLink(
group_id="wins-consumer-group-cost-service-01",
offset_reset="latest",
)
if not client.topic_exists(stream_name):
client.add_topic(stream_name)
client = client.create_consumer()
client.subscribe([stream_name])
logger.info(f"Subscribed to Kafka topic: {stream_name}")
except Exception as connect_exc:
ops
evt = client.poll(1.0)
if evt is None:
continue
if evt.error():
signals
body_bytes = evt.value()
if body_bytes is None:
logger.warning(f"Received message with None value. Message key: {evt.key()}")
continue
try:
text_body = body_bytes.decode('utf-8')
except Exception as decode_exc:
logger.error(f"Failed to decode message: {decode_exc} - raw value: {body_bytes}")
continue
try:
parsed_json = json.loads(text_body)
client.commit(message=evt)
except json.JSONDecodeError as parse_exc:
logger.error(f"Failed to parse JSON: {parse_exc} - message: {text_body}")
continue
if parsed_json is not None:
try:
process_attrs_payload(parsed_json)
except Exception as biz_exc:
logger.error(f"Error in business logic: {biz_exc} - message: {parsed_json}")
except Exception as fatal_exc:
logger.exception(f"consume_source_attrs_loop crashed with: {fatal_exc}")
backoff_tries += 1
delay_s = min(60, 2 ** backoff_tries)
logger.warning(f"Retrying consumer in {delay_s} seconds...")
time.sleep(delay_s)
finally:
backoff_tries = 0
if client is not None:
client.commit()
client.close()
logger.info("Kafka consumer closed.")
Что здесь на самом деле происходит
Такое поведение не является нормой. Корень проблемы — в управлении смещениями (offsets). Важны два момента. Во-первых, API коммита вызывается так, что фактическое смещение, которое вы хотите сохранить, не задаётся. Верная форма ожидает отображение разделов на смещения, то есть {TopicPartition: OffsetAndMetadata}. Коммит сообщения через несовместную сигнатуру не фиксирует нужную позицию, и группа может начать следующую сессию с неподходящего смещения. Во-вторых, полагаться на коммит в финальном блоке — хрупко: он не гарантирован к исполнению при завершении по сигналам SIGTERM или SIGINT. В таких условиях финальное смещение может так и не записаться.
Есть подтверждающая зацепка из эксперимента с конфигурацией. Переход на новый group.id и offset_reset='earliest' делает m4 видимым. Эта комбинация сбрасывает ранее сохранённую позицию и стартует для этой группы с начала доступных данных, поэтому пропущенное сообщение появляется. Это полезная диагностика, но не стратегия для продакшена, если нужна долговременная фиксация состояния.
Исправление: коммитим нужную позицию и не полагаемся на хрупкий путь завершения
Сохраните текущий group.id — так обеспечивается устойчивость. Убедитесь, что вызов commit передаёт явный раздел и следующее смещение, которое надо сохранить. И относитесь к финализатору как к best-effort: надёжное место для фиксации прогресса — сразу после успешной обработки события.
def consume_source_attrs_loop():
client = None
backoff_tries = 0
stream_name = 'eys_device_wins_updated_defSourceAttrs'
try:
while True:
if client is None:
try:
suffix_id = uuid.uuid4().hex[:4]
logger.info("Creating new Kafka consumer...")
client = BrokerLink(
group_id="wins-consumer-group-cost-service-01",
offset_reset="latest",
)
if not client.topic_exists(stream_name):
client.add_topic(stream_name)
client = client.create_consumer()
client.subscribe([stream_name])
logger.info(f"Subscribed to Kafka topic: {stream_name}")
except Exception as connect_exc:
ops
evt = client.poll(1.0)
if evt is None:
continue
if evt.error():
signals
body_bytes = evt.value()
if body_bytes is None:
logger.warning(f"Received message with None value. Message key: {evt.key()}")
continue
try:
text_body = body_bytes.decode('utf-8')
except Exception as decode_exc:
logger.error(f"Failed to decode message: {decode_exc} - raw value: {body_bytes}")
continue
try:
parsed_json = json.loads(text_body)
# Зафиксируйте точную позицию: {TopicPartition: OffsetAndMetadata}
tp = TopicPartition(evt.topic(), evt.partition())
om = OffsetAndMetadata(evt.offset() + 1, None)
client.commit({tp: om})
except json.JSONDecodeError as parse_exc:
logger.error(f"Failed to parse JSON: {parse_exc} - message: {text_body}")
continue
if parsed_json is not None:
try:
process_attrs_payload(parsed_json)
except Exception as biz_exc:
logger.error(f"Error in business logic: {biz_exc} - message: {parsed_json}")
except Exception as fatal_exc:
logger.exception(f"consume_source_attrs_loop crashed with: {fatal_exc}")
backoff_tries += 1
delay_s = min(60, 2 ** backoff_tries)
logger.warning(f"Retrying consumer in {delay_s} seconds...")
time.sleep(delay_s)
finally:
backoff_tries = 0
if client is not None:
# Это best-effort и может не выполниться при SIGTERM/SIGINT
client.commit()
client.close()
logger.info("Kafka consumer closed.")
Чтобы убедиться, что смещения действительно там, где вы их ожидаете, посмотрите состояние группы стандартным скриптом и сравните зафиксированные смещения с конечными (end offsets) по каждому разделу.
kafka-consumer-groups.sh --bootstrap-server broker1:30903,broker2:30448,broker3:30805 \
--describe --group wins-consumer-group-cost-service-01
Почему это важно
Надёжность потребления держится на корректном учёте смещений. Если сохранённая позиция неверна или вовсе не сохранена, перезапущенный консьюмер либо пропустит данные, либо обработает их повторно. В зависимости от трафика это может выглядеть как исчезновение одного сообщения в периоде простоя сервиса. Коммит точного следующего смещения на корректном TopicPartition и отказ от надежд на финализаторы при завершении делают рестарты предсказуемыми.
Итоги
Это не «нормальное» поведение Kafka. Симптом указывает на обработку смещений, а не на проблемы брокеров или KRaft. Используйте стабильный consumer group.id. Значение offset.reset подбирайте под сценарий восстановления, но не полагайтесь на него при обычных рестартах. После успешной обработки коммитьте правильную структуру {TopicPartition: OffsetAndMetadata}. Не рассчитывайте на коммиты в момент очистки: при завершении по сигналам они могут не выполниться. Если сомневаетесь, опишите состояние группы консьюмера и проверьте, что именно зафиксировано, прежде чем поднимать сервис.