Skip to main content

Kafka Avro 1

Description

example-kafka-avro-1.png
  • Running a Kafka cluster locally (Docker)
  • Defining the Avro Schema
  • Producing Avro messages
  • Consuming Avro messages

Prerequisites

  • Linux
  • JDK 21
  • Maven
  • Docker

Project Structure

.
├── kafka-docker-compose
│   ├── kafka-ssl
│   │   ├── docker-compose.yml
│   │   ├── README.md
│   │   ├── security
│   │   │   ├── create-certificates.sh
│   │   │   ├── create-jks-certificate.sh
│   │   │   ├── create-pem-certificate.sh
│   │   │   └── remove-certs.sh
│   │   ├── start-cluster.sh
│   │   └── stop-cluster.sh
│   ├── LICENSE
│   ├── README.md
│   ├── single-node-avro-kafka
│   │   └── docker-compose.yml
│   └── single-node-kafka
│   └── docker-compose.yml
├── pom.xml
├── README.md
└── src
└── main
├── java
│   └── jreact
│   └── com
│   └── kafka
│   └── schema
│   └── registry
│   ├── consumer
│   │   ├── GenericRecordConsumer.java.bak
│   │   └── SpecificRecordConsumer.java
│   └── producer
│   ├── GenericRecordProducer.java.bak
│   └── SpecificRecordProducer.java
└── resources
├── avro
│   └── SimpleMessage.avsc
└── log4j.properties

Running Docker

Got to folder single-node-avro-kafka and and start:

docker-compose up -d
docker ps

Output:

 CONTAINER ID   IMAGE                                   COMMAND                  CREATED       STATUS       PORTS                                        NAMES
2949b466be6c confluentinc/cp-schema-registry:7.5.2 "/etc/confluent/dock…" 6 hours ago Up 6 hours 0.0.0.0:8081->8081/tcp sn-schema-registry
981051ab7fed confluentinc/cp-kafka:7.5.2 "/etc/confluent/dock…" 6 hours ago Up 6 hours 0.0.0.0:9092->9092/tcp sn-kafka
f07a02a7f3d4 confluentinc/cp-zookeeper:7.5.2 "/etc/confluent/dock…" 6 hours ago Up 6 hours 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp sn-zookeeper
docker exec -it sn-kafka /bin/bash
cd /opt
kafka-topics --list --bootstrap-server localhost:9092

Result:

__consumer_offsets
_schemas
avro-topic

avro-topic is created by public class SpecificRecordProducer

Defining the Avro Schema

SimpleMessage.avsc

{
"type": "record",
"name": "SimpleMessage",
"namespace": "com.jreact.kafka.schema.registry.avro",
"fields": [
{"name": "content", "type":"string", "doc": "Message content"},
{"name": "date_time", "type":"string", "doc": "Datetime when the message was generated"}
]
}

Running avro-maven-plugin

mvn clean package

Result

├── src
│   └── main
│   ├── java
│   │   └── com
│   │   └── jreact
│   │   └── kafka
│   │   └── schema
│   │   └── registry
│   │   ├── avro
│   │   │   └── SimpleMessage.java

SpecificRecordProducer.java

package com.jreact.kafka.schema.registry.producer;

import com.jreact.kafka.schema.registry.avro.SimpleMessage;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Instant;
import java.util.Properties;

public class SpecificRecordProducer {

public static void main(String[] args) {
SpecificRecordProducer specificRecordProducer = new SpecificRecordProducer();
specificRecordProducer.writeMessage();
}

public void writeMessage() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

Producer<String, SpecificRecord> producer = new KafkaProducer<>(properties);

//create the specific record
SimpleMessage simpleMessage = new SimpleMessage();
simpleMessage.setContent("Hello world");
simpleMessage.setDateTime(Instant.now().toString());

// creates avro-topic if it doesn't exist
// public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
// ...
// }
ProducerRecord<String, SpecificRecord> record = new ProducerRecord<>("avro-topic", null, simpleMessage);

producer.send(record);
//ensures record is sent before closing the producer
producer.flush();

producer.close();
}
}

SpecificRecordConsumer.java

package com.jreact.kafka.schema.registry.consumer;

import com.jreact.kafka.schema.registry.avro.SimpleMessage;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SpecificRecordConsumer {

public static void main(String[] args) {
SpecificRecordConsumer genericRecordConsumer = new SpecificRecordConsumer();
genericRecordConsumer.readMessages();
}

public void readMessages() {
//create kafka producer
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "specific-record-consumer-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

KafkaConsumer<String, SimpleMessage> consumer = new KafkaConsumer<>(properties);

consumer.subscribe(Collections.singleton("avro-topic"));

//poll the record from the topic
while (true) {
ConsumerRecords<String, SimpleMessage> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, SimpleMessage> record : records) {
System.out.println("Message content: " + record.value().getContent());
System.out.println("Message time: " + record.value().getDateTime());
}
consumer.commitAsync();
}

}
}

Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/reactive-messaging/kafka/kafka-avro-1


Based on:
https://codingharbour.com/apache-kafka/guide-to-apache-avro-and-kafka/