Kafka Streams Getting Started
Description

- Input Stream - an input kafka topic in which you will get video titles with number of views per video
- Consumer / Processor - a kafka streams application which reads the video views data and counts the number of views each video is getting
- Processor / Consumer - Kafka streams will write the total video view data into an output topic
- The output topic at any point will have the total number of views each of our videos have got.
Prerequisites
- Maven
- JDK 1.8+
- Docker / Docker Desktop
- Docker apache/kafka:3.9.0
Create the input and output topic
Setup Docker apache/kafka
docker pull apache/kafka:3.9.0
docker run -p 9092:9092 apache/kafka:3.9.0
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8fd96b3b82b5 apache/kafka:3.9.0 "/__cacert_entrypoin…" 44 hours ago Up 29 seconds 0.0.0.0:9092->9092/tcp nifty_curie
Container name (above): nifty_curie
docker exec -it nifty_curie /bin/bash
cd /opt/kafka
Create topics
input-topic
bin/kafka-topics.sh --create --topic input-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
output topic
bin/kafka-topics.sh --create --topic output-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
Project Structure
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka-streams-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>KafkaStreamsDemo</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
KafkaStreamsDemo.java
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class KafkaStreamsDemo {
public static void main(String[] args) {
// initialise a default logging mechanism
BasicConfigurator.configure();
Logger.getRootLogger().setLevel(Level.INFO);
// setup String, Long Serdes to help with serialising and deserialsing the kafka data
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// build the kafka streams topology
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> views = builder.stream(
"input-topic",
Consumed.with(stringSerde, stringSerde)
);
KTable<String, Long> totalViews = views
.mapValues(v -> Long.parseLong(v))
.groupByKey(Grouped.with(stringSerde, longSerde))
.reduce(Long::sum);
totalViews.toStream().to("output-topic", Produced.with(stringSerde, longSerde));
// create a KafkaStreams object
final Properties props = new Properties();
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-totalviews");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
// starts the kafka streams application
final CountDownLatch latch = new CountDownLatch(1);
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
Runtime.getRuntime().addShutdownHook(new Thread("streams-totalviews") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
System.exit(0);
}
}
Running the example
mvn clean package
java -jar target/kafka-streams-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
In a second terminal, start a kafka producer using the following command:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic input-topic \
--property key.separator=":" \
--property parse.key=true
In a third terminal, start a kafka consumer using the following command:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic output-topic \
--property print.key=true \
--property key.separator="-" \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
In the Producer terminal send the following:
Video1:5
Video2:10
Video1:4
Video1:0
Video2:7
Video3:12
Video1:7
In the Consumer terminal you will get the following output:
Video2-17
Video3-12
Video1-16
Based on: https://adityasridhar.com/posts/how-to-get-started-with-kafka-streams_