Skip to main content

Taxi Ride Generator

Description

Example that counts the rides for each driver. Note that this is implicitly keeping state for each driver. This sort of simple, non-windowed aggregation on an unbounded set of keys will use an unbounded amount of state. When this is an issue, look at the SQL/Table API, or ProcessFunction, or state TTL, all of which provide mechanisms for expiring state for stale keys.

Prerequisites

  • JDK 11
  • Maven
  • IDE (IntelliJ IDEA, Eclipse)
  • Flink 1.20.0

Project Structure

.
├── dependency-reduced-pom.xml
├── pom.xml
├── README.md
└── src
└── main
├── java
│   └── org
│   └── apache
│   └── flink
│   └── training
│   ├── examples
│   │   └── ridecount
│   │   └── RideCountExample.java
│   └── exercises
│   └── common
│   ├── datatypes
│   │   ├── RideAndFare.java.bak
│   │   ├── TaxiFare.java
│   │   └── TaxiRide.java
│   ├── sources
│   │   ├── TaxiFareGenerator.java
│   │   └── TaxiRideGenerator.java
│   └── utils
│   ├── DataGenerator.java
│   ├── GeoUtils.java
│   └── MissingSolutionException.java
└── resources
├── log4j
└── log4j.properties.txt

RideCountExample.java

public class RideCountExample {

/**
* Main method.
*
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {

// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start the data generator
DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());

// map each ride to a tuple of (driverId, 1)
DataStream<Tuple2<Long, Long>> tuples =
rides.map(
new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(TaxiRide ride) {
return Tuple2.of(ride.driverId, 1L);
}
});

// partition the stream by the driverId
KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t -> t.f0);

// count the rides for each driver
DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);

// we could, in fact, print out any or all of these streams
rideCounts.print();

// run the cleansing pipeline
env.execute("Ride Count");
}
}

Run & Output

(IntelliJ IDEA)

// ...
15> (2013000123,13)
15> (2013000089,13)
2> (2013000102,15)
1> (2013000042,15)
7> (2013000027,10)
4> (2013000180,10)
14> (2013000052,9)
18> (2013000005,11)
26> (2013000129,10)
// ...

Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/streams/flink/taxi-ride-generator