Skip to main content

Schema Registry Spring Boot

Description

schema-registry-spring-boot-01.png

The purpose of the Example is to configure a basic Kafka producer and consumer that will produce and consume messages from the same topic, using a pre defined avro schema for the message sent.

Prerequisites

  • Linux
  • JDK 21
  • Maven
  • Docker
  • Spring Boot

Project Structure

.
├── docker
│   ├── docker-compose.yml
│   ├── generate_certificates.sh
│   ├── kafka
│   │   └── secrets
│   │   ├── kafka.keystore.jks
│   │   ├── kafka_secret.txt
│   │   ├── kafka.truststore.jks
│   │   ├── schema-registry.keystore.jks
│   │   └── schema-registry.truststore.jks
│   └── kafka-demo-app
│   ├── docker-entrypoint.sh
│   └── Dockerfile
├── pom.xml
├── README.md
└── src
├── main
│   ├── java
│   │   └── com
│   │   └── kafka
│   │   └── demo
│   │   ├── config
│   │   │   ├── KafkaConsumerConfig.java
│   │   │   └── KafkaProducerConfig.java
│   │   ├── consumer
│   │   │   └── KafkaConsumer.java
│   │   ├── DemoApplication.java
│   │   ├── KafkaRunner.java
│   │   ├── producer
│   │   │   └── KafkaProducer.java
│   │   └── properties
│   │   └── KafkaCustomProperties.java
│   └── resources
│   ├── application.properties
│   ├── avro
│   │   └── TransactionEvent.avsc
│   ├── server-keystore.jks
│   └── server-truststore.jks
└── test
└── java
└── com
└── kafka
└── demo
└── DemoApplicationTests.java

Secrets

Copy resources/server-keystore.jks, server-truststore.jks to
/var/kafka-demo-app/conf

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.3</version>
</parent>
<groupId>com.kafka</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>POC for producing and consuming kakfa messages in a spring-boot app</description>
<properties>
<java.version>21</java.version>
<avro-maven-plugin.version>1.11.0</avro-maven-plugin.version>
<kafka-avro-serializer.version>7.5.1</kafka-avro-serializer.version>
<apache-avro.version>1.11.3</apache-avro.version>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- Avro schema related dependencies for serialization/deserialization -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${apache-avro.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka-avro-serializer.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<finalName>kafka-demo-app-${project.version}</finalName>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<layout>ZIP</layout>
<executable>true</executable>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>

<!-- avro-maven-plugin -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro-maven-plugin.version}</version>
<executions>
<execution>
<id>avro</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<name>confluent</name>
<url>https://packages.confluent.io/maven/</url>
<layout>default</layout>
</repository>
</repositories>
</project>

Notice in pom.xml

<!-- Avro schema related dependencies for serialization/deserialization -->
and
<!-- avro-maven-plugin -->

Avro schema

We use a avro schema called TransactionEvent of type “record” in .avsc format which needs to be placed under src/main/resources/avro. Based on the structure of this file, the avro-maven-plugin will generate the associated Java classes under the package com.kafka.demo.avro.model.

See Project Structure - above.

and

After mvn clean package you should have:

── target
├── classes
│   ├── application.properties
│   ├── avro
│   │   └── TransactionEvent.avsc
│   ├── com
│   │   └── kafka
│   │   └── demo
│   │   ├── avro
│   │   │   └── model
│   │   │   ├── TransactionEvent$Builder.class
│   │   │   ├── TransactionEventBody$Builder.class
│   │   │   ├── TransactionEventBody.class
│   │   │   ├── TransactionEvent.class
│   │   │   ├── TransactionEventHeader$Builder.class
│   │   │   ├── TransactionEventHeader.class
│   │   │   └── TransactionType.class
│   │   ├── config
│   │   │   ├── KafkaConsumerConfig.class
│   │   │   └── KafkaProducerConfig.class
│   │   ├── consumer
│   │   │   └── KafkaConsumer.class
│   │   ├── DemoApplication.class
│   │   ├── KafkaRunner.class
│   │   ├── producer
│   │   │   └── KafkaProducer.class
│   │   └── properties
│   │   └── KafkaCustomProperties.class
│   ├── server-keystore.jks
│   └── server-truststore.jks
├── generated-sources
│   └── annotations
├── generated-test-sources
│   └── test-annotations
├── kafka-demo-app-0.0.1-SNAPSHOT.jar
├── kafka-demo-app-0.0.1-SNAPSHOT.jar.original

and

├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │   └── kafka
│   │   │   └── demo
│   │   │   ├── avro
│   │   │   │   └── model
│   │   │   │   ├── TransactionEventBody.java
│   │   │   │   ├── TransactionEventHeader.java
│   │   │   │   ├── TransactionEvent.java
│   │   │   │   └── TransactionType.java
│   │   │   ├── config
│   │   │   │   ├── KafkaConsumerConfig.java
│   │   │   │   └── KafkaProducerConfig.java
│   │   │   ├── consumer
│   │   │   │   └── KafkaConsumer.java
│   │   │   ├── DemoApplication.java
│   │   │   ├── KafkaRunner.java
│   │   │   ├── producer
│   │   │   │   └── KafkaProducer.java
│   │   │   └── properties
│   │   │   └── KafkaCustomProperties.java

Run Example

docker-compose up

Example Docker Output:


# ...

schema-registry_1 | [2025-01-05 08:40:03,480] INFO Kafka commitId: c43d449ca7777b4e (org.apache.kafka.common.utils.AppInfoParser)
schema-registry_1 | [2025-01-05 08:40:03,481] INFO Kafka startTimeMs: 1736066403480 (org.apache.kafka.common.utils.AppInfoParser)
schema-registry_1 | [2025-01-05 08:40:03,499] INFO [Schema registry clientId=sr-1, groupId=schema-registry] Cluster ID: K438Gk-kRxGulCD3BGdWaQ (org.apache.kafka.clients.Metadata)
kafka_1 | [2025-01-05 08:40:03,504] INFO Creating topic __consumer_offsets with configuration {compression.type=producer, cleanup.policy=compact, segment.bytes=104857600} and initial partition assignment HashMap(0 -> ArrayBuffer(1), 1 -> ArrayBuffer(1), 2 -> ArrayBuffer(1), 3 -> ArrayBuffer(1), 4 -> ArrayBuffer(1), 5 -> ArrayBuffer(1), 6 -> ArrayBuffer(1), 7 -> ArrayBuffer(1), 8 -> ArrayBuffer(1), 9 -> ArrayBuffer(1), 10 -> ArrayBuffer(1), 11 -> ArrayBuffer(1), 12 -> ArrayBuffer(1), 13 -> ArrayBuffer(1), 14 -> ArrayBuffer(1), 15 -> ArrayBuffer(1), 16 -> ArrayBuffer(1), 17 -> ArrayBuffer(1), 18 -> ArrayBuffer(1), 19 -> ArrayBuffer(1), 20 -> ArrayBuffer(1), 21 -> ArrayBuffer(1), 22 -> ArrayBuffer(1), 23 -> ArrayBuffer(1), 24 -> ArrayBuffer(1), 25 -> ArrayBuffer(1), 26 -> ArrayBuffer(1), 27 -> ArrayBuffer(1), 28 -> ArrayBuffer(1), 29 -> ArrayBuffer(1), 30 -> ArrayBuffer(1), 31 -> ArrayBuffer(1), 32 -> ArrayBuffer(1), 33 -> ArrayBuffer(1), 34 -> ArrayBuffer(1), 35 -> ArrayBuffer(1), 36 -> ArrayBuffer(1), 37 -> ArrayBuffer(1), 38 -> ArrayBuffer(1), 39 -> ArrayBuffer(1), 40 -> ArrayBuffer(1), 41 -> ArrayBuffer(1), 42 -> ArrayBuffer(1), 43 -> ArrayBuffer(1), 44 -> ArrayBuffer(1), 45 -> ArrayBuffer(1), 46 -> ArrayBuffer(1), 47 -> ArrayBuffer(1), 48 -> ArrayBuffer(1), 49 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)
kafka_1 | [2025-01-05 08:40:03,516] INFO [Controller id=1] New topics: [Set(__consumer_offsets)], deleted topics: [HashSet()], new partition replica assignment [Set(TopicIdReplicaAssignment(__consumer_offsets,Some(sCl4AQdgQzysT-cqZX-oGA),HashMap(__consumer_offsets-22 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-30 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-25 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-35 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-37 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-38 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-13 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-8 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-21 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-4 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-27 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-7 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-9 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-46 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-41 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-33 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-23 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-49 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-47 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-16 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-28 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-31 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-36 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-42 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-3 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-18 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-15 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-24 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-17 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-48 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-19 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-11 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-2 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-43 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-6 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-14 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-20 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-0 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-44 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-39 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-12 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-45 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-1 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-5 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-26 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-29 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-34 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-10 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-32 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), __consumer_offsets-40 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=))))] (kafka.controller.KafkaController)

# ...

On the base:
Implementing a basic Kafka Producer and Consumer using Spring Boot, Spring Kafka and Avro schema
Implementing a basic Kafka Producer and Consumer using Spring Boot, Spring Kafka and Avro schema (github)