2025, Nov 21 09:00

How to Prevent Missing Messages After Kafka Consumer Restarts: Correct Offset Commits per TopicPartition

Fix Kafka consumer restarts that skip a message: commit the next offset per TopicPartition, avoid shutdown-only commits, and keep a stable group.id in KRaft.

Kafka consumer restarts that silently skip a single in-flight message are frustrating, especially when the cluster is healthy and the producer proves that the topic is alive. In a KRaft-based setup with three brokers and three controllers on confluentinc/cp-kafka:7.8.0, a producer sends m1–m5, the consumer processes m1–m3, then goes offline; after it comes back, m4 never arrives, while m5 does. Let’s unpack what’s going on and how to make consumption durable across restarts without resorting to a new group.id each time.

Minimal example that reproduces the symptom

def consume_source_attrs_loop():
    client = None
    backoff_tries = 0
    stream_name = 'eys_device_wins_updated_defSourceAttrs'
    try:
        while True:
            if client is None:
                try:
                    suffix_id = uuid.uuid4().hex[:4]
                    logger.info("Creating new Kafka consumer...")
                    client = BrokerLink(
                        group_id="wins-consumer-group-cost-service-01",
                        offset_reset="latest",
                    )

                    if not client.topic_exists(stream_name):
                        client.add_topic(stream_name)

                    client = client.create_consumer()
                    client.subscribe([stream_name])
                    logger.info(f"Subscribed to Kafka topic: {stream_name}")
                except Exception as connect_exc:
                    ops

            evt = client.poll(1.0)

            if evt is None:
                continue

            if evt.error():
                signals

            body_bytes = evt.value()
            if body_bytes is None:
                logger.warning(f"Received message with None value. Message key: {evt.key()}")
                continue

            try:
                text_body = body_bytes.decode('utf-8')
            except Exception as decode_exc:
                logger.error(f"Failed to decode message: {decode_exc} - raw value: {body_bytes}")
                continue

            try:
                parsed_json = json.loads(text_body)
                client.commit(message=evt)
            except json.JSONDecodeError as parse_exc:
                logger.error(f"Failed to parse JSON: {parse_exc} - message: {text_body}")
                continue

            if parsed_json is not None:
                try:
                    process_attrs_payload(parsed_json)
                except Exception as biz_exc:
                    logger.error(f"Error in business logic: {biz_exc} - message: {parsed_json}")

    except Exception as fatal_exc:
        logger.exception(f"consume_source_attrs_loop crashed with: {fatal_exc}")
        backoff_tries += 1
        delay_s = min(60, 2 ** backoff_tries)
        logger.warning(f"Retrying consumer in {delay_s} seconds...")
        time.sleep(delay_s)
    finally:
        backoff_tries = 0
        if client is not None:
            client.commit()
            client.close()
            logger.info("Kafka consumer closed.")

What’s really biting here

The behavior is not expected. The root of the problem sits in how offsets are managed. Two details stand out. First, the commit API is being invoked in a way that does not actually express the offset you want to store. The correct form expects a map of partitions to offsets, as in {TopicPartition: OffsetAndMetadata}. Committing a message via an incompatible signature won’t persist the intended position, which can leave the group in a state where the next session starts at an unintended offset. Second, relying on the commit call in the cleanup path is fragile: that block is not guaranteed to run on shutdown events like SIGTERM or SIGINT. When a process terminates under those conditions, the final offset might never be recorded.

There’s a confirming clue in the experiment with configuration. Switching to a fresh group.id and offset_reset='earliest' makes m4 appear. That combination discards the prior committed position and starts from the beginning of available data for that group, so the missing message becomes visible. It’s a useful diagnostic, but not a production strategy when you need durable state.

Fix: commit the right thing and don’t depend on a fragile shutdown path

Keep your existing group.id for durability. Ensure that the commit call provides the explicit partition and the next offset to store. Also, treat the finalizer as best-effort only; the reliable place to record progress is right after you’ve successfully processed an event.

def consume_source_attrs_loop():
    client = None
    backoff_tries = 0
    stream_name = 'eys_device_wins_updated_defSourceAttrs'
    try:
        while True:
            if client is None:
                try:
                    suffix_id = uuid.uuid4().hex[:4]
                    logger.info("Creating new Kafka consumer...")
                    client = BrokerLink(
                        group_id="wins-consumer-group-cost-service-01",
                        offset_reset="latest",
                    )

                    if not client.topic_exists(stream_name):
                        client.add_topic(stream_name)

                    client = client.create_consumer()
                    client.subscribe([stream_name])
                    logger.info(f"Subscribed to Kafka topic: {stream_name}")
                except Exception as connect_exc:
                    ops

            evt = client.poll(1.0)
            if evt is None:
                continue

            if evt.error():
                signals

            body_bytes = evt.value()
            if body_bytes is None:
                logger.warning(f"Received message with None value. Message key: {evt.key()}")
                continue

            try:
                text_body = body_bytes.decode('utf-8')
            except Exception as decode_exc:
                logger.error(f"Failed to decode message: {decode_exc} - raw value: {body_bytes}")
                continue

            try:
                parsed_json = json.loads(text_body)

                # Commit the precise position: {TopicPartition: OffsetAndMetadata}
                tp = TopicPartition(evt.topic(), evt.partition())
                om = OffsetAndMetadata(evt.offset() + 1, None)
                client.commit({tp: om})

            except json.JSONDecodeError as parse_exc:
                logger.error(f"Failed to parse JSON: {parse_exc} - message: {text_body}")
                continue

            if parsed_json is not None:
                try:
                    process_attrs_payload(parsed_json)
                except Exception as biz_exc:
                    logger.error(f"Error in business logic: {biz_exc} - message: {parsed_json}")

    except Exception as fatal_exc:
        logger.exception(f"consume_source_attrs_loop crashed with: {fatal_exc}")
        backoff_tries += 1
        delay_s = min(60, 2 ** backoff_tries)
        logger.warning(f"Retrying consumer in {delay_s} seconds...")
        time.sleep(delay_s)
    finally:
        backoff_tries = 0
        if client is not None:
            # This is best-effort and may not run on SIGTERM/SIGINT
            client.commit()
            client.close()
            logger.info("Kafka consumer closed.")

To verify that offsets are truly where you expect them to be, inspect the group state with the standard script and compare committed offsets to end offsets on each partition.

kafka-consumer-groups.sh --bootstrap-server broker1:30903,broker2:30448,broker3:30805 \
  --describe --group wins-consumer-group-cost-service-01

Why this matters

Durable consumption hinges on correct offset bookkeeping. If the stored position is wrong or not stored at all, a restarted consumer either skips data or reprocesses it. Depending on traffic patterns, that can look like a single message disappearing in the gap while the service was offline. Ensuring the API commits the exact next offset on the correct TopicPartition, and not treating finalizers as a safety net for shutdown, makes restarts predictable.

Takeaways

This is not expected Kafka behavior. The symptom points to offset handling rather than to the brokers or to KRaft itself. Use a stable consumer group.id. Keep offset.reset at a value that matches your recovery intent, but do not rely on it for normal restarts. Commit the correct structure {TopicPartition: OffsetAndMetadata} after successful processing. Don’t depend on cleanup-time commits; they may not run on termination signals. When in doubt, describe the consumer group to confirm what is actually committed before you bring the service back.