Kafka Avro 1
Description

- Running a Kafka cluster locally (Docker)
- Defining the Avro Schema
- Producing Avro messages
- Consuming Avro messages
Prerequisites
- Linux
- JDK 21
- Maven
- Docker
Project Structure
.
├── kafka-docker-compose
│ ├── kafka-ssl
│ │ ├── docker-compose.yml
│ │ ├── README.md
│ │ ├── security
│ │ │ ├── create-certificates.sh
│ │ │ ├── create-jks-certificate.sh
│ │ │ ├── create-pem-certificate.sh
│ │ │ └── remove-certs.sh
│ │ ├── start-cluster.sh
│ │ └── stop-cluster.sh
│ ├── LICENSE
│ ├── README.md
│ ├── single-node-avro-kafka
│ │ └── docker-compose.yml
│ └── single-node-kafka
│ └── docker-compose.yml
├── pom.xml
├── README.md
└── src
└── main
├── java
│ └── jreact
│ └── com
│ └── kafka
│ └── schema
│ └── registry
│ ├── consumer
│ │ ├── GenericRecordConsumer.java.bak
│ │ └── SpecificRecordConsumer.java
│ └── producer
│ ├── GenericRecordProducer.java.bak
│ └── SpecificRecordProducer.java
└── resources
├── avro
│ └── SimpleMessage.avsc
└── log4j.properties
Running Docker
Got to folder single-node-avro-kafka and and start:
docker-compose up -d
docker ps
Output:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2949b466be6c confluentinc/cp-schema-registry:7.5.2 "/etc/confluent/dock…" 6 hours ago Up 6 hours 0.0.0.0:8081->8081/tcp sn-schema-registry
981051ab7fed confluentinc/cp-kafka:7.5.2 "/etc/confluent/dock…" 6 hours ago Up 6 hours 0.0.0.0:9092->9092/tcp sn-kafka
f07a02a7f3d4 confluentinc/cp-zookeeper:7.5.2 "/etc/confluent/dock…" 6 hours ago Up 6 hours 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp sn-zookeeper
docker exec -it sn-kafka /bin/bash
cd /opt
kafka-topics --list --bootstrap-server localhost:9092
Result:
__consumer_offsets
_schemas
avro-topic
avro-topic is created by public class SpecificRecordProducer
Defining the Avro Schema
SimpleMessage.avsc
{
"type": "record",
"name": "SimpleMessage",
"namespace": "com.jreact.kafka.schema.registry.avro",
"fields": [
{"name": "content", "type":"string", "doc": "Message content"},
{"name": "date_time", "type":"string", "doc": "Datetime when the message was generated"}
]
}
Running avro-maven-plugin
mvn clean package
Result
├── src
│ └── main
│ ├── java
│ │ └── com
│ │ └── jreact
│ │ └── kafka
│ │ └── schema
│ │ └── registry
│ │ ├── avro
│ │ │ └── SimpleMessage.java
SpecificRecordProducer.java
package com.jreact.kafka.schema.registry.producer;
import com.jreact.kafka.schema.registry.avro.SimpleMessage;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Instant;
import java.util.Properties;
public class SpecificRecordProducer {
public static void main(String[] args) {
SpecificRecordProducer specificRecordProducer = new SpecificRecordProducer();
specificRecordProducer.writeMessage();
}
public void writeMessage() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
Producer<String, SpecificRecord> producer = new KafkaProducer<>(properties);
//create the specific record
SimpleMessage simpleMessage = new SimpleMessage();
simpleMessage.setContent("Hello world");
simpleMessage.setDateTime(Instant.now().toString());
// creates avro-topic if it doesn't exist
// public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
// ...
// }
ProducerRecord<String, SpecificRecord> record = new ProducerRecord<>("avro-topic", null, simpleMessage);
producer.send(record);
//ensures record is sent before closing the producer
producer.flush();
producer.close();
}
}
SpecificRecordConsumer.java
package com.jreact.kafka.schema.registry.consumer;
import com.jreact.kafka.schema.registry.avro.SimpleMessage;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SpecificRecordConsumer {
public static void main(String[] args) {
SpecificRecordConsumer genericRecordConsumer = new SpecificRecordConsumer();
genericRecordConsumer.readMessages();
}
public void readMessages() {
//create kafka producer
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "specific-record-consumer-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
KafkaConsumer<String, SimpleMessage> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("avro-topic"));
//poll the record from the topic
while (true) {
ConsumerRecords<String, SimpleMessage> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, SimpleMessage> record : records) {
System.out.println("Message content: " + record.value().getContent());
System.out.println("Message time: " + record.value().getDateTime());
}
consumer.commitAsync();
}
}
}
Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/reactive-messaging/kafka/kafka-avro-1
Based on:
https://codingharbour.com/apache-kafka/guide-to-apache-avro-and-kafka/