Kafka Protobuf 1
Description

- Running a Kafka cluster locally (Docker)
- Defining the Protobuf Schema
- Producing Protobuf messages
- Consuming Protobuf messages
Prerequisites
- Linux
- JDK 21
- Maven
- Docker
Project Structure
.
├── docker-compose.yml
├── pom.xml
├── README.md
└── src
└── main
├── java
│ └── com
│ └── jreact
│ └── kafka
│ └── protobuf
│ ├── consumer
│ │ ├── GenericProtobufConsumer.java.bak
│ │ └── ProtobufConsumer.java
│ └── producer
│ └── ProtobufProducer.java
└── resources
└── protobuf
└── SimpleMessage.proto
Running Docker
docker-compose up -d
docker ps
Output:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
6a619998c439 confluentinc/cp-schema-registry:7.5.2 "/etc/confluent/dock…" About an hour ago Up About an hour 0.0.0.0:8081->8081/tcp sn-schema-registry
2f8cf73b5f37 confluentinc/cp-kafka:7.5.2 "/etc/confluent/dock…" About an hour ago Up About an hour 0.0.0.0:9092->9092/tcp sn-kafka
e3487b27ef7a confluentinc/cp-zookeeper:7.5.2 "/etc/confluent/dock…" About an hour ago Up About an hour 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp sn-zookeeper
docker exec -it sn-kafka /bin/bash
cd /opt
kafka-topics --list --bootstrap-server localhost:9092
Result:
__consumer_offsets
_schemas
protobuf-topic
Defining the Protobuf Schema
SimpleMessage.proto
syntax = "proto3";
package com.jreact.protobuf;
option java_outer_classname = "SimpleMessageProtos";
message SimpleMessage {
string content = 1;
string date_time = 2;
}
Running protoc-jar-maven-plugin
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<inputDirectories>
<include>${project.basedir}/src/main/resources/protobuf</include>
</inputDirectories>
<outputTargets>
<outputTarget>
<type>java</type>
<addSources>main</addSources>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</outputTarget>
</outputTargets>
</configuration>
</execution>
</executions>
</plugin>
Result
.
...
...
├── src
│ └── main
│ ├── java
│ │ └── com
│ │ └── jreact
│ │ ├── kafka
│ │ │ └── protobuf
...
...
│ │ └── protobuf
│ │ └── SimpleMessageProtos.java
│ └── resources
│ └── protobuf
│ └── SimpleMessage.proto
└── target
...
...
ProtobufProducer.java
package com.jreact.kafka.protobuf.producer;
import com.jreact.protobuf.SimpleMessageProtos.SimpleMessage;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Instant;
import java.util.Properties;
public class ProtobufProducer {
public static void main(String[] args) {
ProtobufProducer protobufProducer = new ProtobufProducer();
protobufProducer.writeMessage();
}
public void writeMessage() {
//create kafka producer
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
properties.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
Producer<String, SimpleMessage> producer = new KafkaProducer<>(properties);
//prepare the message
SimpleMessage simpleMessage =
SimpleMessage.newBuilder()
.setContent("Hello world")
.setDateTime(Instant.now().toString())
.build();
System.out.println(simpleMessage);
//prepare the kafka record
ProducerRecord<String, SimpleMessage> record
= new ProducerRecord<>("protobuf-topic", null, simpleMessage);
producer.send(record);
//ensures record is sent before closing the producer
producer.flush();
producer.close();
}
}
ProtobufConsumer.java
package com.jreact.kafka.protobuf.consumer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import static com.jreact.protobuf.SimpleMessageProtos.SimpleMessage;
public class ProtobufConsumer {
public static void main(String[] args) {
ProtobufConsumer protobufConsumer = new ProtobufConsumer();
protobufConsumer.readMessages();
}
public void readMessages() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-consumer-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, SimpleMessage.class.getName());
KafkaConsumer<String, SimpleMessage> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("protobuf-topic"));
//poll the record from the topic
while (true) {
ConsumerRecords<String, SimpleMessage> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, SimpleMessage> record : records) {
System.out.println("Message content: " + record.value().getContent());
System.out.println("Message time: " + record.value().getDateTime());
}
consumer.commitAsync();
}
}
}
Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/reactive-messaging/kafka/kafka-protobuf-1
Based on:
https://codingharbour.com/apache-kafka/how-to-use-protobuf-with-apache-kafka-and-schema-registry/