Skip to main content

Session Window

Description

Session Window

Prerequisites

  • JDK 11
  • Maven
  • IDE (IntelliJ IDEA, Eclipse)
  • Flink 1.20.0

Project Structure

.
├── dependency-reduced-pom.xml
├── pom.xml
├── README.md
└── src
└── main
├── java
│   └── com
│   └── jreact
│   └── flink
│   └── sessionwindow
│   └── SessionWindow.java
└── resources
├── log4j
└── log4j.properties.txt

SessionWindow.java

public class SessionWindow {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment executionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = executionEnvironment
.socketTextStream("localhost", 9000, '\n', 6);

DataStream<Tuple3<String,String, Double>> userClickStream = text.map(row -> {
String[] fields = row.split(",");
if (fields.length == 3) {
return new Tuple3<>(
fields[0],
fields[1],
Double.parseDouble(fields[2])
);
}
throw new Exception("Not valid arg passed");
}, TypeInformation.of(new TypeHint<Tuple3<String, String, Double>>() {
}));

DataStream<Tuple3<String, String, Double>> maxPageVisitTime =
userClickStream.keyBy(((KeySelector<Tuple3<String, String, Double>,
Tuple2<String, String>>) stringStringDoubleTuple3 ->
new Tuple2<>(stringStringDoubleTuple3.f0, stringStringDoubleTuple3.f1)),
TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
}))
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.max(2);

maxPageVisitTime.print();

executionEnvironment.execute("Session window example.");
}
}

Run & Output

mvn clean package

Copy target/sessionwindow-0.1.jar to flink-1.20.0/examples/jreact

Start local cluster:
./bin/start-cluster.sh

Open the terminal and run below command to start a socket window:
nc -lk 9000

Open a new terminal and run:
./bin/flink run examples/jreact/sessionwindow-0.1.jar

Insert some data:

// on the nc -lk 9000:
aaa,bbb,1
bbb,ccc,2
ccc,ddd,3

Output

flink-1.20.0/log/flink-*-taskexecutor-*.out:

(aaa,bbb,1.0)
(bbb,ccc,2.0)
(ccc,ddd,3.0)

Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/streams/flink/session-window