Skip to main content

Sliding Window

Description

Sliding Window

Prerequisites

  • JDK 11
  • Maven
  • IDE (IntelliJ IDEA, Eclipse)
  • Flink 1.20.0

Project Structure

.
├── dependency-reduced-pom.xml
├── pom.xml
├── README.md
└── src
└── main
├── java
│   ├── com
│   │   └── jreact
│   │   └── flink
│   │   └── SlidingWindow.java
│   └── model
│   └── WordWithCounts.java
└── resources
├── log4j
└── log4j.properties.txt

SlidingWindow.java

public class SlidingWindow {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment executionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();

final DataStream<String> text = executionEnvironment
.socketTextStream("localhost", 9000, '\n', 6);

final DataStream<WordWithCounts> slidingWindowWordCount =

text.flatMap((FlatMapFunction<String, WordWithCounts>)
(textStream, wordCountKeyPair) -> {
for (String word : textStream.split("\\W")) {
wordCountKeyPair.collect(new WordWithCounts(word, 1L));
}
}, TypeInformation.of(WordWithCounts.class))
.keyBy((KeySelector<WordWithCounts, String>) wordWithCount -> wordWithCount.word,
TypeInformation.of(String.class))
.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)))
.reduce((ReduceFunction<WordWithCounts>)
(a, b) -> new WordWithCounts(a.word, a.count + b.count));

// print the results with a single thread, rather than in parallel
slidingWindowWordCount.print();

executionEnvironment.execute("Flink Sliding window Example");
}
}

WordWithCounts.java

public class WordWithCounts {

public String word;
public long count;

public WordWithCounts() {}

public WordWithCounts(String word, long count) {
this.word = word;
this.count = count;
}


@Override
public String toString() {
return word + " : " + count;
}
}

Run & Output

mvn clean package

Copy target/slidingwindow-0.1.jar to flink-1.20.0/examples/jreact

Start local cluster:
./bin/start-cluster.sh

nc -lk 9000

Open a new terminal and run:
./bin/flink run examples/jreact/slidingwindow-0.1.jar

Insert some data:

// on the nc -lk 9000:
aaa
bbb
ccc
ddd

Output

flink-1.20.0/log/flink-*-taskexecutor-*.out:
aaa : 1
ccc : 1
bbb : 1
ddd : 1
ccc : 1
aaa : 1
bbb : 1
ddd : 1
ccc : 1
bbb : 1
ddd : 1
aaa : 1

localhost:8081

sliding-window-browser-01.png

Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/streams/flink/sliding-window