Tumbling Window
Description
Prerequisites
- JDK 11
- Maven
- IDE (IntelliJ IDEA, Eclipse)
- Flink 1.20.0
Project Structure
.
├── dependency-reduced-pom.xml
├── pom.xml
├── README.md
└─ ─ src
└── main
├── java
│ ├── com
│ │ └── jreact
│ │ └── flink
│ │ └── TumblingWindow.java
│ └── model
│ └── WordWithCount.java
└── resources
├── log4j
└── log4j.properties.txt
TumblingWindow.java
public final class TumblingWindow {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment executionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
final DataStream<String> text = executionEnvironment
.socketTextStream("localhost", 9000, '\n', 6);
final DataStream<WordWithCount> tumblingWordCount = text
.flatMap((FlatMapFunction<String, WordWithCount>) (textStream, wordCountKeyPair) -> {
for (String word : textStream.split("\\W")) {
wordCountKeyPair.collect(new WordWithCount(word, 1L));
}
}, TypeInformation.of(WordWithCount.class))
.keyBy((KeySelector<WordWithCount, String>) wordWithCount -> wordWithCount.word,
TypeInformation.of(String.class))
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce((ReduceFunction<WordWithCount>)
(a, b) -> new WordWithCount(a.word, a.count + b.count));
// print the results with a single thread, rather than in parallel
tumblingWordCount.print();
executionEnvironment.execute("Flink Tumbling window Example");
}
}
WordWithCount.java
public class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
Run & Output
mvn clean package
Copy target/tumblingwindow-0.1.jar to flink-1.20.0/examples/jreact
Start local cluster:
./bin/start-cluster.sh
nc -lk 9000
Open a new terminal and run:
./bin/flink run examples/jreact/tumblingwindow-0.1.jar
Insert some data:
// on the nc -lk 9000:
aa
bb
cc
dd
ee
Output
flink-1.20.0/log/flink-*-taskexecutor-*.out:
aa : 1
ee : 1
dd : 1
cc : 1
bb : 1
localhost:8081
