Press enter or click to view image in full size
In some cases, your Kafka consumers may consistently show a lag of 1, even though they appear to be fully caught up. This behavior is confusing and misleading — it suggests that the consumer is behind, or that there’s a record it hasn’t processed yet.
But when you inspect your application, everything looks fine:
- The consumer is running
- Messages are being processed
- There are no errors
And yet… lag = 1 persists.
To make matters more puzzling, tools used to inspect Kafka messages may show missing offsets or gaps — as if data was lost or skipped entirely.
So what’s really going on?
The answer lies in how Kafka handles transactions. When using exactly-once semantics (EOS) — such as in Kafka Streams or transactional producers — Kafka writes special internal records called transaction markers. These are not visible to consumers, but they do occupy offsets in the partition.
So even though there’s no user data left to consume, these invisible control records can still push the log-end offset ahead of the consumer’s committed offset — resulting in lag = 1 with nothing new to read.
Let’s walk through a concrete example to simulate this behavior and observe it in action.
Step 1: Kafka Streams App with Exactly-Once Transactions
Let’s build a minimal Kafka Streams application that:
- Reads records from
input-topic - Converts each value to uppercase
- Writes the result to
output-topic
What’s important here is how we configure the Streams app:
- processing.guarantee = exactly_once_v2 — enables transactional processing
- commit.interval.ms = 100 — makes Kafka commit transactions frequently, so we’re more likely to see one record per transaction, followed by a commit control marker at the next offset
This setup simulates the conditions under which phantom lag = 1 can appear, even though no user data is pending.
public static void main(String[] args) {
Properties p = new Properties();
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "one-txn-demo");
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); p.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
// Make the internal producer flush immediately
p.put(StreamsConfig.producerPrefix("linger.ms"), 0);
p.put(StreamsConfig.producerPrefix("batch.size"), 1);
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
StreamsBuilder b = new StreamsBuilder();
b.<String,String>stream("input-topic")
.mapValues(v -> v == null ? null : v.toUpperCase())
.to("output-topic");
KafkaStreams s = new KafkaStreams(b.build(), p);
s.start();
Runtime.getRuntime().addShutdownHook(new Thread(s::close));
}
Create the topics (1 partition each):
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic input-topic --partitions 1 --replication-factor 1kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic output-topic --partitions 1 --replication-factor 1
and start the application.
Step 2: Run a Consumer That Commits Offsets Manually
Now let’s run a simple consumer that reads from output-topic and commits offsets manually — right after processing each record. This ensures the consumer only commits what it has truly seen.
public static void main(String[] args) { Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
try (KafkaConsumer<String, String> c = new KafkaConsumer<>(props)) {
c.subscribe(Collections.singletonList("output-topic"));
while (true) {
ConsumerRecords<String, String> recs = c.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> r : recs) {
// process
c.commitSync(Collections.singletonMap(new TopicPartition(r.topic(), r.partition()),
new OffsetAndMetadata(r.offset() + 1) // commit NEXT once
));
}
// no further periodic commits here
}
Step 3: Send One Message
Finally, send a single message to the input-topic using a basic producer app:
public static void main(final String[] args) throws Exception { final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (final KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<>("input-topic", "key", "home"));
producer.flush();
}
}
Step 4: Exploring the Behaviour
At this point we would expect:
- One message in the topic
- Lag = 0
But Kafka UI tool shows something unexpected.
Press enter or click to view image in full size
Press enter or click to view image in full size
It feels like a bug — but it’s not. Even kafka-consumer-groups.sh shows the same:
./kafka-consumer-groups --group reading-consumer --describe --bootstrap-server localhost:9092CURRENT-OFFSET: 1
LOG-END-OFFSET: 2
LAG: 1
Let’s inspect the raw Kafka segment:
kafka-run-class.sh kafka.tools.DumpLogSegments \
--files /tmp/kafka-logs/output-topic-0/00000000000000000000.log \
--print-data-log --deep-iterationAnd there we see it:
Dumping /var/lib/kafka/data/output-topic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1754235503612 size: 75 magic: 2 compresscodec: none crc: 3767373474 isvalid: true
| offset: 0 CreateTime: 1754235503612 keySize: 3 valueSize: 4 sequence: 0 headerKeys: [] key: key payload: HOME
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true deleteHorizonMs: OptionalLong.empty position: 75 CreateTime: 1754235503944 size: 78 magic: 2 compresscodec: none crc: 1884853412 isvalid: trueSo, we have:
offset: 0 → our "HELLO" message
offset: 1 → COMMIT control marker (isControl=true, endTxnMarker=COMMIT)That second “message” is not a user record — it’s a transaction marker that Kafka appends after every successful transaction.
This control record:
- Advances the offset
- Is not delivered to consumers
- Still affects lag calculations (via log-end offset)
You didn’t do anything wrong. Your consumer is healthy. The “extra message” is a transaction commit marker — invisible to readers but counted in lag.
This is how Kafka’s transactional model works under the hood — and why sometimes, lag = 1 doesn’t mean unread data.