Flink Kafka Word Count
Source, operator and sink in DataStream API
Description
This is a re-write of the Apache Flink WordCount example using Kafka connectors.

Prerequisites
- JDK 11
- Maven
- IDE (IntelliJ IDEA)
- Flink 1.17.1
- Docker / Docker Desktop
- Docker apache/kafka:3.9.0
Data Setup
docker pull apache/kafka:3.9.0
docker run -p 9092:9092 apache/kafka:3.9.0
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
5076e71c54cd apache/kafka:3.9.0 "/__cacert_entrypoin…" 5 hours ago Up 5 hours 0.0.0.0:9092->9092/tcp fervent_dijkstra
In our case (above):
Container name: fervent_dijkstra
Create topics:
docker exec -it fervent_dijkstra /bin/bash
cd /opt/kafka
bin/kafka-topics.sh --create --topic inputTopic --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --topic outputTopic --bootstrap-server localhost:9092
Project Structure
.
├── pom.xml
├── README.md
├── src
│ └── main
│ ├── java
│ │ └── io
│ │ └── redpanda
│ │ └── examples
│ │ └── WordCount.java
│ └── resources
│ └── log4j2.properties
└── target
WordCount.java
public class WordCount {
final static String inputTopic = "inputTopic";
final static String outputTopic = "outputTopic";
final static String jobTitle = "WordCount";
public static void main(String[] args) throws Exception {
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
// Set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(inputTopic)
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(new SimpleStringSchema())
.setTopic(outputTopic)
.build();
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(serializer)
.build();
DataStream<String> text = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Split up the lines in pairs (2-tuples) containing: (word,1)
DataStream<String> counts = text.flatMap(new Tokenizer())
// Group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0)
.sum(1)
.flatMap(new Reducer());
// Add the sink to so results
// are written to the outputTopic
counts.sinkTo(sink);
// Execute program
env.execute(jobTitle);
}
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
* form of "(word,1)" ({@code Tuple2<String, Integer>}).
*/
public static final class Tokenizer
implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// Normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// Emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
// Implements a simple reducer using FlatMap to
// reduce the Tuple2 into a single string for
// writing to kafka topics
public static final class Reducer
implements FlatMapFunction<Tuple2<String, Integer>, String> {
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<String> out) {
// Convert the pairs to a string
// for easy writing to Kafka Topic
String count = value.f0 + " " + value.f1;
out.collect(count);
}
}
}
Run
# Run
WordCount.java
Input
bin/kafka-console-producer.sh --topic inputTopic --bootstrap-server localhost:9092
>a
>a b
>a b c
>a b c d
>
Output
# Next terminal
bin/kafka-console-consumer.sh --topic outputTopic --from-beginning --bootstrap-server localhost:9092
a 1
a 2
b 1
a 3
b 2
c 1
c 2
b 3
a 4
d 1
Source code:
https://github.com/ZbCiok/zjc-examples/tree/main/streams/flink/flink-kafka-word-count