Windowing
Windows
Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink and how the programmer can benefit to the maximum from its offered functionality.
To tackle the infinite stream of incoming messages Flink implemented a concept called windows. So the idea is very simple, separate elements of infinite streams into streams of finite groups (windows) and then process these groups independently.
A “Window” defines a finite set of elements on an unbounded stream over which we can apply computations. This set can be based on time, element counts, a combination of counts and time, or some custom logic to assign elements to windows.
The general structure of a windowed Flink program is presented below. The first snippet refers to keyed streams, while the second to non-keyed ones.
Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
This operator not only groups the finite number of elements into windows but in each window, it also splits elements into logical streams. It groups elements to process in parallel. Let’s take a look at an example.

Flink windowing implementation
Flink implements windowing using two main components:
- window assigner: responsible for assigning each event to one or more windows based on some criteria (e.g., time or count)
- window operator: responsible for managing the state of each window (e.g., storing the events or intermediate results), applying triggers and evictors, and executing window functions
Flink’s windowing implementation is based on two key concepts:
- event time: is the logical time at which an event occurred in the real world, as opposed to the processing time at which an event arrived at a system component
- watermarks: are special events that carry information about the progress of event time in a stream. Watermarks are used by Flink to determine when a window can be closed and processed, based on the assumption that no events with a lower timestamp will arrive after a watermark
To define a window on a stream, you need to specify three components:
- window assigner: this determines how the events are assigned to windows. Flink provides built-in window assigners for the four types of keyed windows and the two types of non-keyed windows, as well as a generic window assigner that allows you to define your own custom windows
- window trigger: this determines when a window is ready to be processed and emitted. Flink provides built-in triggers for event time and processing time, as well as a generic trigger that allows you to define your own custom triggers
- window function: this determines how the events in a window are processed and aggregated. Flink provides built-in window functions for common operations, such as sum, count, min, max, and reduce, as well as a generic window function that allows you to define your own custom logic
In addition to these components, you can also specify two optional components:
- window evictor: this determines how the events are evicted from a window after they have been processed. Flink provides built-in evictors for count-based and time-based eviction, as well as a generic evictor that allows you to define your own custom eviction logic
- allowed lateness: this determines how late events are handled after a window has been emitted. Flink allows you to specify a grace period for late events, and either discard them or update the previous results
Session Window
It’s a dynamic-sized windows that group events based on session activity. A session window closes when there is a gap of inactivity that exceeds a specified threshold. For example, if you define a session window with a gap of 10 minutes, the stream will be divided into windows that contain events that are at most 10 minutes apart from each other.
Session windows do not overlap and do not have a fixed start and end time. A session window assigner can be configured with either a static session gap or with a session gap extractor function which defines how long the period of inactivity is.
The Session windows assigner groups elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time. The number of entities within a session window is not fixed. Because it is a user who defines typically how long the session would be. A session window closes when it does not receive elements for a certain period of time, i.e., when a gap of inactivity occurred. For example, once we have been idle on the amazon website let say for 1 minute that is the end of the previous session and if go back to the site after 1 sec it will start a new session. The way it would determine the session is the pause between one click and another click.

input // a stream of key-value pairs
.keyBy(0) // partition the stream by the first field (key)
.window(EventTimeSessionWindows.withGap(Time.minutes(5))) // assign a session window with a 5-minute gap duration based on event time
.process(new DeduplicateProcessFunction()) // filter out duplicate values per key in each window using a custom process function
.print(); // print the results to standard output
Example
Sliding Window
The sliding windows assigner assigns elements to windows of fixed length. It is defined by two time durations, a width for each window and a slide that designates the time between the start of consecutive windows. Typically the slide is less than the window. A sliding window can be used to provide a view of data over some lookback time (the width), updated with some frequency (the slide).

An example sliding window with width=5 minutes and slide=3 minutes.
The following code snippets show how to use sliding windows.
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Duration.ofSeconds(10), Duration.ofSeconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Duration.ofSeconds(10), Duration.ofSeconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Duration.ofHours(12), Duration.ofHours(1), Duration.ofHours(-8)))
.<windowed transformation>(<window function>);
Example
Tumbling Windows
A tumbling windows assigner assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumbling window with a size of 5 minutes, the current window will be evaluated and a new window will be started every five minutes as illustrated by the following figure.

The following code snippets show how to use tumbling windows.
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Duration.ofDays(1), Duration.ofHours(-8)))
.<windowed transformation>(<window function>);
Example
Non-Keyed Windows
Non keyed window simply separate elements of infinite streams into the stream of a finite group. It results in the non-parallel processing of a single stream. The syntax for non keyed window is:
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
For non keyed window, we just need to call the windowAll(…) method as shown below.
//Non keyed window
stream.windowAll(...)