Skip to main content

Average Sensor Readings

Description

Our example application ingests a stream of temperature measurements. The program converts the temperatures from Fahrenheit to Celsius and computes the average temperature every 5 seconds for each sensor.
The Example is based on https://github.com/streaming-with-flink/examples-java updated by https://jreact.com.

Prerequisites

  • JDK 11
  • Maven
  • IDE
  • Flink 1.20.0

Project Structure

.
├── pom.xml
├── README.md
└── src
└── main
├── java
│   └── io
│   └── github
│   └── streamingwithflink
│   ├── AverageSensorReadings.java
│   └── util
│   ├── SensorReading.java
│   ├── SensorSource.java
│   └── SensorTimeAssigner.java
└── resources
└── log4j.properties

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.github.streamingwithflink</groupId>
<artifactId>examples-java</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<name>Java Examples for Stream Processing with Apache Flink</name>
<url>http://streaming-with-flink.github.io/examples</url>

<description>original version updated by jreact.com</description>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.20.0</flink.version>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>

<!-- runtime-web dependency is need to start web UI from IDE -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>

<!-- queryable-state dependencies are needed for respective examples -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-runtime</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>

<!--
Derby is used for a sink connector example.
Example only works in local mode, i.e, it is not possible to submit it to a running cluster.
The dependency is set to provided to reduce the size of the JAR file.
-->
<!-- <dependency>-->
<!-- <groupId>org.apache.derby</groupId>-->
<!-- <artifactId>derby</artifactId>-->
<!-- <version>10.13.1.1</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->

<!-- Logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.8.2</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>

<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exlude>org.apache.flink:flink-shaded-netty</exlude>
<exlude>org.apache.flink:flink-shaded-guava</exlude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>io.github.streamingwithflink.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
<plugins>

<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>

<activation>
<property>
<name>idea.version</name>
</property>
</activation>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.13.1.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>

</project>

AverageSensorReadings.java

/*
* Copyright 2015 Fabian Hueske / Vasia Kalavri
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.streamingwithflink;

import io.github.streamingwithflink.util.SensorReading;
import io.github.streamingwithflink.util.SensorSource;
import io.github.streamingwithflink.util.SensorTimeAssigner;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class AverageSensorReadings {

/**
* main() defines and executes the DataStream program.
*
* @param args program arguments
* @throws Exception
*/
public static void main(String[] args) throws Exception {

// set up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// configure watermark interval
env.getConfig().setAutoWatermarkInterval(1000L);

// ingest sensor stream
DataStream<SensorReading> sensorData = env
// SensorSource generates random temperature readings
.addSource(new SensorSource())
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner());

DataStream<SensorReading> avgTemp = sensorData
// convert Fahrenheit to Celsius using and inlined map function
.map( r -> new SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)))
// organize stream by sensor
.keyBy(r -> r.id)
// group readings in 1 second windows
.timeWindow(Time.seconds(1))
// compute average temperature using a user-defined function
.apply(new TemperatureAverager());

// print result stream to standard out
avgTemp.print();

// execute application
env.execute("Compute average sensor temperature");
}

/**
* User-defined WindowFunction to compute the average temperature of SensorReadings
*/
public static class TemperatureAverager implements WindowFunction<SensorReading, SensorReading, String, TimeWindow> {

/**
* apply() is invoked once for each window.
*
* @param sensorId the key (sensorId) of the window
* @param window meta data for the window
* @param input an iterable over the collected sensor readings that were assigned to the window
* @param out a collector to emit results from the function
*/
@Override
public void apply(String sensorId, TimeWindow window, Iterable<SensorReading> input, Collector<SensorReading> out) {

// compute the average temperature
int cnt = 0;
double sum = 0.0;
for (SensorReading r : input) {
cnt++;
sum += r.temperature;
}
double avgTemp = sum / cnt;

// emit a SensorReading with the average temperature
out.collect(new SensorReading(sensorId, window.getEnd(), avgTemp));
}
}
}

Run

main(String[] args) output:

// ...

25> (sensor_19, 1730379578000, 8.133933181102755)
8> (sensor_100, 1730379578000, 18.222523572953563)
18> (sensor_5, 1730379578000, 28.826508003545726)
10> (sensor_8, 1730379578000, 12.114165908495995)
23> (sensor_4, 1730379578000, 17.567941330477613)
25> (sensor_42, 1730379578000, 21.287101421027337)
3> (sensor_55, 1730379578000, 10.962288771915803)
25> (sensor_81, 1730379578000, 6.455762557079783)
22> (sensor_7, 1730379578000, 11.564826056832963)
13> (sensor_12, 1730379578000, 44.793100971361206)
6> (sensor_2, 1730379578000, 25.253219210952036)
7> (sensor_26, 1730379578000, 0.5203865386891062)

// ...

Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/streams/flink/AverageSensorReadings