Taxi Ride Generator
Description
Example that counts the rides for each driver. Note that this is implicitly keeping state for each driver. This sort of simple, non-windowed aggregation on an unbounded set of keys will use an unbounded amount of state. When this is an issue, look at the SQL/Table API, or ProcessFunction, or state TTL, all of which provide mechanisms for expiring state for stale keys.
Prerequisites
- JDK 11
- Maven
- IDE (IntelliJ IDEA, Eclipse)
- Flink 1.20.0
Project Structure
.
├── dependency-reduced-pom.xml
├── pom.xml
├── README.md
└── src
└── main
├── java
│ └── org
│ └── apache
│ └── flink
│ └── training
│ ├── examples
│ │ └── ridecount
│ │ └── RideCountExample.java
│ └── exercises
│ └── common
│ ├── datatypes
│ │ ├── RideAndFare.java.bak
│ │ ├── TaxiFare.java
│ │ └── TaxiRide.java
│ ├── sources
│ │ ├── TaxiFareGenerator.java
│ │ └── TaxiRideGenerator.java
│ └── utils
│ ├── DataGenerator.java
│ ├── GeoUtils.java
│ └── MissingSolutionException.java
└── resources
├── log4j
└── log4j.properties.txt
RideCountExample.java
public class RideCountExample {
/**
* Main method.
*
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start the data generator
DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
// map each ride to a tuple of (driverId, 1)
DataStream<Tuple2<Long, Long>> tuples =
rides.map(
new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(TaxiRide ride) {
return Tuple2.of(ride.driverId, 1L);
}
});
// partition the stream by the driverId
KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t -> t.f0);
// count the rides for each driver
DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);
// we could, in fact, print out any or all of these streams
rideCounts.print();
// run the cleansing pipeline
env.execute("Ride Count");
}
}
Run & Output
(IntelliJ IDEA)
// ...
15> (2013000123,13)
15> (2013000089,13)
2> (2013000102,15)
1> (2013000042,15)
7> (2013000027,10)
4> (2013000180,10)
14> (2013000052,9)
18> (2013000005,11)
26> (2013000129,10)
// ...