Session Window
Description
Prerequisites
- JDK 11
- Maven
- IDE (IntelliJ IDEA, Eclipse)
- Flink 1.20.0
Project Structure
.
├── dependency-reduced-pom.xml
├── pom.xml
├── README.md
└── src
└── main
├── java
│ └── com
│ └── jreact
│ └── flink
│ └── sessionwindow
│ └── SessionWindow.java
└── resources
├── log4j
└── log4j.properties.txt
SessionWindow.java
public class SessionWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = executionEnvironment
.socketTextStream("localhost", 9000, '\n', 6);
DataStream<Tuple3<String,String, Double>> userClickStream = text.map(row -> {
String[] fields = row.split(",");
if (fields.length == 3) {
return new Tuple3<>(
fields[0],
fields[1],
Double.parseDouble(fields[2])
);
}
throw new Exception("Not valid arg passed");
}, TypeInformation.of(new TypeHint<Tuple3<String, String, Double>>() {
}));
DataStream<Tuple3<String, String, Double>> maxPageVisitTime =
userClickStream.keyBy(((KeySelector<Tuple3<String, String, Double>,
Tuple2<String, String>>) stringStringDoubleTuple3 ->
new Tuple2<>(stringStringDoubleTuple3.f0, stringStringDoubleTuple3.f1)),
TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
}))
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.max(2);
maxPageVisitTime.print();
executionEnvironment.execute("Session window example.");
}
}
Run & Output
mvn clean package
Copy target/sessionwindow-0.1.jar to flink-1.20.0/examples/jreact
Start local cluster:
./bin/start-cluster.sh
Open the terminal and run below command to start a socket window:
nc -lk 9000
Open a new terminal and run:
./bin/flink run examples/jreact/sessionwindow-0.1.jar
Insert some data:
// on the nc -lk 9000:
aaa,bbb,1
bbb,ccc,2
ccc,ddd,3
Output
flink-1.20.0/log/flink-*-taskexecutor-*.out:
(aaa,bbb,1.0)
(bbb,ccc,2.0)
(ccc,ddd,3.0)