Sliding Window
Description
Prerequisites
- JDK 11
- Maven
- IDE (IntelliJ IDEA, Eclipse)
- Flink 1.20.0
Project Structure
.
├── dependency-reduced-pom.xml
├── pom.xml
├── README.md
└── src
└── main
├── java
│ ├── com
│ │ └── jreact
│ │ └── flink
│ │ └── SlidingWindow.java
│ └── model
│ └── WordWithCounts.java
└── resources
├── log4j
└── log4j.properties.txt
SlidingWindow.java
public class SlidingWindow {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment executionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
final DataStream<String> text = executionEnvironment
.socketTextStream("localhost", 9000, '\n', 6);
final DataStream<WordWithCounts> slidingWindowWordCount =
text.flatMap((FlatMapFunction<String, WordWithCounts>)
(textStream, wordCountKeyPair) -> {
for (String word : textStream.split("\\W")) {
wordCountKeyPair.collect(new WordWithCounts(word, 1L));
}
}, TypeInformation.of(WordWithCounts.class))
.keyBy((KeySelector<WordWithCounts, String>) wordWithCount -> wordWithCount.word,
TypeInformation.of(String.class))
.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)))
.reduce((ReduceFunction<WordWithCounts>)
(a, b) -> new WordWithCounts(a.word, a.count + b.count));
// print the results with a single thread, rather than in parallel
slidingWindowWordCount.print();
executionEnvironment.execute("Flink Sliding window Example");
}
}
WordWithCounts.java
public class WordWithCounts {
public String word;
public long count;
public WordWithCounts() {}
public WordWithCounts(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
Run & Output
mvn clean package
Copy target/slidingwindow-0.1.jar to flink-1.20.0/examples/jreact
Start local cluster:
./bin/start-cluster.sh
nc -lk 9000
Open a new terminal and run:
./bin/flink run examples/jreact/slidingwindow-0.1.jar
Insert some data:
// on the nc -lk 9000:
aaa
bbb
ccc
ddd
Output
flink-1.20.0/log/flink-*-taskexecutor-*.out:
aaa : 1
ccc : 1
bbb : 1
ddd : 1
ccc : 1
aaa : 1
bbb : 1
ddd : 1
ccc : 1
bbb : 1
ddd : 1
aaa : 1
localhost:8081
