Skip to main content

Flink Kafka Word Count

Source, operator and sink in DataStream API

Description

This is a re-write of the Apache Flink WordCount example using Kafka connectors.

flink-kafka-word-count-01.png

Prerequisites

  • JDK 11
  • Maven
  • IDE (IntelliJ IDEA)
  • Flink 1.17.1
  • Docker / Docker Desktop
  • Docker apache/kafka:3.9.0

Data Setup

docker pull apache/kafka:3.9.0
docker run -p 9092:9092 apache/kafka:3.9.0
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
5076e71c54cd apache/kafka:3.9.0 "/__cacert_entrypoin…" 5 hours ago Up 5 hours 0.0.0.0:9092->9092/tcp fervent_dijkstra

In our case (above):
Container name: fervent_dijkstra

Create topics:

docker exec -it fervent_dijkstra /bin/bash
cd /opt/kafka
bin/kafka-topics.sh --create --topic inputTopic --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --topic outputTopic --bootstrap-server localhost:9092

Project Structure

.
├── pom.xml
├── README.md
├── src
│   └── main
│   ├── java
│   │   └── io
│   │   └── redpanda
│   │   └── examples
│   │   └── WordCount.java
│   └── resources
│   └── log4j2.properties
└── target

WordCount.java

public class WordCount {

final static String inputTopic = "inputTopic";
final static String outputTopic = "outputTopic";
final static String jobTitle = "WordCount";

public static void main(String[] args) throws Exception {
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";

// Set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(inputTopic)
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

KafkaRecordSerializationSchema<String> serializer = KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(new SimpleStringSchema())
.setTopic(outputTopic)
.build();

KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(serializer)
.build();

DataStream<String> text = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");


// Split up the lines in pairs (2-tuples) containing: (word,1)
DataStream<String> counts = text.flatMap(new Tokenizer())
// Group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0)
.sum(1)
.flatMap(new Reducer());

// Add the sink to so results
// are written to the outputTopic
counts.sinkTo(sink);

// Execute program
env.execute(jobTitle);
}

/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
* form of "(word,1)" ({@code Tuple2<String, Integer>}).
*/
public static final class Tokenizer
implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// Normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");

// Emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}

// Implements a simple reducer using FlatMap to
// reduce the Tuple2 into a single string for
// writing to kafka topics
public static final class Reducer
implements FlatMapFunction<Tuple2<String, Integer>, String> {

@Override
public void flatMap(Tuple2<String, Integer> value, Collector<String> out) {
// Convert the pairs to a string
// for easy writing to Kafka Topic
String count = value.f0 + " " + value.f1;
out.collect(count);
}
}
}

Run

# Run
WordCount.java

Input


bin/kafka-console-producer.sh --topic inputTopic --bootstrap-server localhost:9092

>a
>a b
>a b c
>a b c d
>

Output

# Next terminal
bin/kafka-console-consumer.sh --topic outputTopic --from-beginning --bootstrap-server localhost:9092

a 1
a 2
b 1
a 3
b 2
c 1
c 2
b 3
a 4
d 1


Source code:
https://github.com/ZbCiok/zjc-examples/tree/main/streams/flink/flink-kafka-word-count