Fraud Detector
Description
Fraud detection is critical for financial institutions, online marketplaces, and e-commerce platforms. A financial institution wants to develop a real-time fraud detection system that analyzes thousands of transactions per second to detect potential fraud. Flink can process each transaction as an individual event with millisecond latency. It doesn’t rely on micro-batching, allowing it to provide real-time results.
Fraud Detection with the DataStream API
Prerequisites
- JDK 11
- Maven
- IDE (IntelliJ IDEA, Eclipse)
- Flink 1.20.0
Project Structure
.
├── dependency-reduced-pom.xml
├── pom.xml
└── src
└── main
├── java
│ └── spendreport
│ ├── FraudDetectionJob.java
│ └── FraudDetector.java
└── resources
└── log4j2.properties
FraudDetectionJob.java
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
The FraudDetectionJob class defines the data flow of the application.
-
The Execution Environment
The StreamExecutionEnvironment is how you set properties for your Job, create your sources, and finally trigger the execution of the Job.StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
Creating a Source
Sources ingest data from external systems. This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process. Each transaction contains an account ID (accountId), timestamp (timestamp) of when the transaction occurred, and US$ amount (amount). The name attached to the source is just for debugging purposes, so if something goes wrong, we will know where the error originated.DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions"); -
Partitioning Events & Detecting Fraud
The transactions stream contains a lot of transactions from a large number of users, such that it needs to be processed in parallel by multiple fraud detection tasks. Since fraud occurs on a per-account basis, you must ensure that all transactions for the same account are processed by the same parallel task of the fraud detector operator.
To ensure that the same physical task processes all records for a particular key, you can partition a stream using DataStream#keyBy. The process() call adds an operator that applies a function to each partitioned element in the stream. It is common to say the operator immediately after a keyBy, in this case FraudDetector, is executed within a keyed context.DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector"); -
Outputting Results
A sink writes a DataStream to an external system; such as Apache Kafka, Cassandra, and AWS Kinesis. The AlertSink logs each Alert record with log level INFO, instead of writing it to persistent storage, so you can easily see your results.alerts.addSink(new AlertSink());
FraudDetector.java
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
private transient ValueState<Boolean> flagState;
private transient ValueState<Long> timerState;
@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
// Get the current state for the current key
Boolean lastTransactionWasSmall = flagState.value();
// Check if the flag is set
if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT) {
//Output an alert downstream
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
// Clean up our state
cleanUp(context);
}
if (transaction.getAmount() < SMALL_AMOUNT) {
// set the flag to true
flagState.update(true);
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
// remove flag after 1 minute
timerState.clear();
flagState.clear();
}
private void cleanUp(Context ctx) throws Exception {
// delete timer
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
// clean up all state
timerState.clear();
flagState.clear();
}
}
The FraudDetector class defines the business logic of the function that detects fraudulent transactions.
- open(OpenContext openContext)
in RichFunction.{
// initialize the value of searchString
}
To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state.@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
} - processElement
The fraud detector is implemented as a KeyedProcessFunction. Its method KeyedProcessFunction#processElement is called for every transaction event. This version produces an alert on every transaction.
processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3)<br />KeyedProcessFunction#processElement is called with a Context that contains a timer service. The timer service can be used to query the current time, register timers, and delete timers. With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in timerState.if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT) {
//Output an alert downstream
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
// Clean up our state
cleanUp(context);
}if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT) {
//Output an alert downstream
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
// Clean up our state
cleanUp(context);
} - onTimer
When a timer fires, it calls KeyedProcessFunction#onTimer. Overriding this method is how you can implement your callback to reset the flag.onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out)
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
// remove flag after 1 minute
timerState.clear();
flagState.clear();
} - cleanUp
Finally, to cancel the timer, you need to delete the registered timer and delete the timer state. You can wrap this in a helper method and call this method instead of flagState.clear().cleanUp(Context ctx)
private void cleanUp(Context ctx) throws Exception {
// delete timer
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
// clean up all state
timerState.clear();
flagState.clear();
}
Run & Output
12:42:51,996 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
12:42:57,014 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
12:43:02,031 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
12:43:07,048 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
12:43:12,065 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
12:43:17,081 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
12:43:22,096 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}
12:43:27,109 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=3}