Schema Registry Spring Boot
Description

The purpose of the Example is to configure a basic Kafka producer and consumer that will produce and consume messages from the same topic, using a pre defined avro schema for the message sent.
Prerequisites
- Linux
- JDK 21
- Maven
- Docker
- Spring Boot
Project Structure
.
├── docker
│ ├── docker-compose.yml
│ ├── generate_certificates.sh
│ ├── kafka
│ │ └── secrets
│ │ ├── kafka.keystore.jks
│ │ ├── kafka_secret.txt
│ │ ├── kafka.truststore.jks
│ │ ├── schema-registry.keystore.jks
│ │ └── schema-registry.truststore.jks
│ └── kafka-demo-app
│ ├── docker-entrypoint.sh
│ └── Dockerfile
├── pom.xml
├── README.md
└── src
├── main
│ ├── java
│ │ └── com
│ │ └── kafka
│ │ └── demo
│ │ ├── config
│ │ │ ├── KafkaConsumerConfig.java
│ │ │ └── KafkaProducerConfig.java
│ │ ├── consumer
│ │ │ └── KafkaConsumer.java
│ │ ├── DemoApplication.java
│ │ ├── KafkaRunner.java
│ │ ├── producer
│ │ │ └── KafkaProducer.java
│ │ └── properties
│ │ └── KafkaCustomProperties.java
│ └── resources
│ ├── application.properties
│ ├── avro
│ │ └── TransactionEvent.avsc
│ ├── server-keystore.jks
│ └── server-truststore.jks
└── test
└── java
└── com
└── kafka
└── demo
└── DemoApplicationTests.java
Secrets
Copy resources/server-keystore.jks, server-truststore.jks to
/var/kafka-demo-app/conf
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.3</version>
</parent>
<groupId>com.kafka</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>POC for producing and consuming kakfa messages in a spring-boot app</description>
<properties>
<java.version>21</java.version>
<avro-maven-plugin.version>1.11.0</avro-maven-plugin.version>
<kafka-avro-serializer.version>7.5.1</kafka-avro-serializer.version>
<apache-avro.version>1.11.3</apache-avro.version>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Avro schema related dependencies for serialization/deserialization -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${apache-avro.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka-avro-serializer.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>kafka-demo-app-${project.version}</finalName>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<layout>ZIP</layout>
<executable>true</executable>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<!-- avro-maven-plugin -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro-maven-plugin.version}</version>
<executions>
<execution>
<id>avro</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<name>confluent</name>
<url>https://packages.confluent.io/maven/</url>
<layout>default</layout>
</repository>
</repositories>
</project>
Notice in pom.xml
<!-- Avro schema related dependencies for serialization/deserialization -->
and
<!-- avro-maven-plugin -->
Avro schema
We use a avro schema called TransactionEvent of type “record” in .avsc format which needs to be placed under src/main/resources/avro. Based on the structure of this file, the avro-maven-plugin will generate the associated Java classes under the package com.kafka.demo.avro.model.
See Project Structure - above.
and
After mvn clean package you should have:
── target
├── classes
│ ├── application.properties
│ ├── avro
│ │ └── TransactionEvent.avsc
│ ├── com
│ │ └── kafka
│ │ └── demo
│ │ ├── avro
│ │ │ └── model
│ │ │ ├── TransactionEvent$Builder.class
│ │ │ ├── TransactionEventBody$Builder.class
│ │ │ ├── TransactionEventBody.class
│ │ │ ├── TransactionEvent.class
│ │ │ ├── TransactionEventHeader$Builder.class
│ │ │ ├── TransactionEventHeader.class
│ │ │ └── TransactionType.class
│ │ ├── config
│ │ │ ├── KafkaConsumerConfig.class
│ │ │ └── KafkaProducerConfig.class
│ │ ├── consumer
│ │ │ └── KafkaConsumer.class
│ │ ├── DemoApplication.class
│ │ ├── KafkaRunner.class
│ │ ├── producer
│ │ │ └── KafkaProducer.class
│ │ └── properties
│ │ └── KafkaCustomProperties.class
│ ├── server-keystore.jks
│ └── server-truststore.jks
├── generated-sources
│ └── annotations
├── generated-test-sources
│ └── test-annotations
├── kafka-demo-app-0.0.1-SNAPSHOT.jar
├── kafka-demo-app-0.0.1-SNAPSHOT.jar.original
and
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── kafka
│ │ │ └── demo
│ │ │ ├── avro
│ │ │ │ └── model
│ │ │ │ ├── TransactionEventBody.java
│ │ │ │ ├── TransactionEventHeader.java
│ │ │ │ ├── TransactionEvent.java
│ │ │ │ └── TransactionType.java
│ │ │ ├── config
│ │ │ │ ├── KafkaConsumerConfig.java
│ │ │ │ └── KafkaProducerConfig.java
│ │ │ ├── consumer
│ │ │ │ └── KafkaConsumer.java
│ │ │ ├── DemoApplication.java
│ │ │ ├── KafkaRunner.java
│ │ │ ├── producer
│ │ │ │ └── KafkaProducer.java
│ │ │ └── properties
│ │ │ └── KafkaCustomProperties.java
Run Example
docker-compose up
Example Docker Output:
# ...
schema-registry_1 | [2025-01-05 08:40:03,480] INFO Kafka commitId: c43d449ca7777b4e (org.apache.kafka.common.utils.AppInfoParser)
schema-registry_1 | [2025-01-05 08:40:03,481] INFO Kafka startTimeMs: 1736066403480 (org.apache.kafka.common.utils.AppInfoParser)
schema-registry_1 | [2025-01-05 08:40:03,499] INFO [Schema registry clientId=sr-1, groupId=schema-registry] Cluster ID: K438Gk-kRxGulCD3BGdWaQ (org.apache.kafka.clients.Metadata)
kafka_1 | [2025-01-05 08:40:03,504] INFO Creating topic __consumer_offsets with configuration {compression.type=producer, cleanup.policy=compact, segment.bytes=104857600} and initial partition assignment HashMap(0 -> ArrayBuffer(1), 1 -> ArrayBuffer(1), 2 -> ArrayBuffer(1), 3 -> ArrayBuffer(1), 4 -> ArrayBuffer(1), 5 -> ArrayBuffer(1), 6 -> ArrayBuffer(1), 7 -> ArrayBuffer(1), 8 -> ArrayBuffer(1), 9 -> ArrayBuffer(1), 10 -> ArrayBuffer(1), 11 -> ArrayBuffer(1), 12 -> ArrayBuffer(1), 13 -> ArrayBuffer(1), 14 -> ArrayBuffer(1), 15 -> ArrayBuffer(1), 16 -> ArrayBuffer(1), 17 -> ArrayBuffer(1), 18 -> ArrayBuffer(1), 19 -> ArrayBuffer(1), 20 -> ArrayBuffer(1), 21 -> ArrayBuffer(1), 22 -> ArrayBuffer(1), 23 -> ArrayBuffer(1), 24 -> ArrayBuffer(1), 25 -> ArrayBuffer(1), 26 -> ArrayBuffer(1), 27 -> ArrayBuffer(1), 28 -> ArrayBuffer(1), 29 -> ArrayBuffer(1), 30 -> ArrayBuffer(1), 31 -> ArrayBuffer(1), 32 -> ArrayBuffer(1), 33 -> ArrayBuffer(1), 34 -> ArrayBuffer(1), 35 -> ArrayBuffer(1), 36 -> ArrayBuffer(1), 37 -> ArrayBuffer(1), 38 -> ArrayBuffer(1), 39 -> ArrayBuffer(1), 40 -> ArrayBuffer(1), 41 -> ArrayBuffer(1), 42 -> ArrayBuffer(1), 43 -> ArrayBuffer(1), 44 -> ArrayBuffer(1), 45 -> ArrayBuffer(1), 46 -> ArrayBuffer(1), 47 -> ArrayBuffer(1), 48 -> ArrayBuffer(1), 49 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)
kafka_1 | [2025-01-05 08:40:03,516] INFO [Controller id=1] New topics: [Set(__consumer_offsets)], deleted topics: [HashSet()], new partition replica assignment [Set(TopicIdReplicaAssignment(__consumer_offsets,Some(sCl4AQdgQzysT-cqZX-oGA),HashMap(__consumer_offsets-22 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-30 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-25 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-35 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-37 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-38 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-13 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-8 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-21 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-4 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-27 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-7 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-9 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-46 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-41 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-33 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-23 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-49 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-47 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-16 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-28 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-31 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-36 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-42 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-3 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-18 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-15 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-24 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-17 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-48 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-19 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-11 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-2 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-43 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-6 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-14 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-20 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-0 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-44 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-39 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-12 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-45 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-1 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-5 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-26 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-29 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-34 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-10 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-32 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-40 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=))))] (kafka.controller.KafkaController)
# ...
On the base:
Implementing a basic Kafka Producer and Consumer using Spring Boot, Spring Kafka and Avro schema
Implementing a basic Kafka Producer and Consumer using Spring Boot, Spring Kafka and Avro schema (github)