Stock Price Streams
Description
What we are interested in here is the processing part of the data, where we take the data, do some manipulations, and extract insights from it.

Prerequisites
- JDK 17
- Maven
- IDE (IntelliJ IDEA)
- Flink 1.20.0
- Docker / Docker Desktop
- Docker apache/kafka:3.9.0
Data Setup
docker pull apache/kafka:3.9.0
docker run -p 9092:9092 apache/kafka:3.9.0
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
5076e71c54cd apache/kafka:3.9.0 "/__cacert_entrypoin…" 5 hours ago Up 5 hours 0.0.0.0:9092->9092/tcp fervent_dijkstra
In our case (above):
Container name: fervent_dijkstra
Create topics:
docker exec -it fervent_dijkstra /bin/bash
cd /opt/kafka
bin/kafka-topics.sh --create --topic price --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --topic stock --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --topic stock_update --bootstrap-server localhost:9092
Download sources
https://github.com/ZbCiok/zjc-examples/tree/main/streams/flink/live-stock-streaming
Setup IntelliJ

Project Structure
.
├── docker-kafka.cmds
├── pom.xml
├── README.md
├── src
│ └── main
│ ├── java
│ │ └── com
│ │ └── abdelrani
│ │ └── stockstreams
│ │ ├── configuration
│ │ │ ├── JacksonConfiguration.java
│ │ │ ├── KafkaProperties.java
│ │ │ └── PropertiesLoader.java
│ │ ├── DataStreamJob.java
│ │ ├── messages
│ │ │ ├── in
│ │ │ │ ├── Price.java
│ │ │ │ └── Stock.java
│ │ │ └── out
│ │ │ └── StockUpdate.java
│ │ ├── operators
│ │ │ └── StockPriceJoiner.java
│ │ ├── serdes
│ │ │ ├── PriceDeserializer.java
│ │ │ ├── StockDeserializer.java
│ │ │ └── StockUpdateSerializer.java
│ │ └── sources
│ │ └── SourceBuilder.java
│ └── resources
│ └── logback.xml
└── target
Environment variables
KafkaProperties.java
@Data
@Builder
public class KafkaProperties {
private String bootstrapServers;
private String topic;
private String groupId;
private String clientId;
private String autoOffsetReset;
}
PropertiesLoader.java
public class PropertiesLoader {
private static final Map<String, String> ENVS = System.getenv();
public KafkaProperties priceKafkaProperties() {
return KafkaProperties.builder()
.bootstrapServers(ENVS.get("PRICE_KAFKA_BOOTSTRAP_SERVERS"))
.topic(ENVS.get("PRICE_KAFKA_TOPIC"))
.groupId(ENVS.get("PRICE_KAFKA_GROUP_ID"))
.clientId(ENVS.get("PRICE_KAFKA_CLIENT_ID"))
.autoOffsetReset(ENVS.get("PRICE_KAFKA_AUTO_OFFSET_RESET"))
.build();
}
public KafkaProperties stockKafkaProperties() {
return KafkaProperties.builder()
.bootstrapServers(ENVS.get("STOCK_KAFKA_BOOTSTRAP_SERVERS"))
.topic(ENVS.get("STOCK_KAFKA_TOPIC"))
.groupId(ENVS.get("STOCK_KAFKA_GROUP_ID"))
.clientId(ENVS.get("STOCK_KAFKA_CLIENT_ID"))
.autoOffsetReset(ENVS.get("STOCK_KAFKA_AUTO_OFFSET_RESET"))
.build();
}
public KafkaProperties stockUpdateKafkaProperties() {
return KafkaProperties.builder()
.bootstrapServers(ENVS.get("STOCK_UPDATE_KAFKA_BOOTSTRAP_SERVERS"))
.topic(ENVS.get("STOCK_UPDATE_KAFKA_TOPIC"))
.autoOffsetReset(ENVS.get("STOCK_UPDATE_KAFKA_AUTO_OFFSET_RESET"))
.build();
}
}
.env
# Price config
PRICE_KAFKA_BOOTSTRAP_SERVERS=localhost:9092
PRICE_KAFKA_TOPIC=price
PRICE_KAFKA_GROUP_ID=price_group
RICE_KAFKA_CLIENT_ID=price_client
PRICE_KAFKA_AUTO_OFFSET_RESET=latest
#Stock config
STOCK_KAFKA_BOOTSTRAP_SERVERS=localhost:9092
STOCK_KAFKA_TOPIC=stock
STOCK_KAFKA_GROUP_ID=stock_group
STOCK_KAFKA_CLIENT_ID=stock_client
STOCK_KAFKA_AUTO_OFFSET_RESET=latest
# Stock price updates config
STOCK_UPDATE_KAFKA_BOOTSTRAP_SERVERS=localhost:9092
STOCK_UPDATE_KAFKA_TOPIC=stock_update
STOCK_UPDATE_KAFKA_AUTO_OFFSET_RESET=latest
Kafka consumers and producers:
public class SourceBuilder {
public static KafkaSource<Price> priceSource(KafkaProperties kafkaProperties) {
return KafkaSource.<Price>builder()
.setBootstrapServers(kafkaProperties.getBootstrapServers())
.setTopics(kafkaProperties.getTopic())
.setValueOnlyDeserializer(new PriceDeserializer())
.setGroupId(kafkaProperties.getGroupId())
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
}
public static KafkaSource<Stock> stockSource(KafkaProperties kafkaProperties) {
return KafkaSource.<Stock>builder()
.setBootstrapServers(kafkaProperties.getBootstrapServers())
.setTopics(kafkaProperties.getTopic())
.setDeserializer(new StockDeserializer())
.setGroupId(kafkaProperties.getGroupId())
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setProperty("partition.discovery.interval.ms", "10000") // discover new partitions per 10 seconds
.build();
}
public static KafkaSink<StockUpdate> stockUpdateSink(KafkaProperties kafkaProperties) {
return KafkaSink.<StockUpdate>builder()
.setBootstrapServers(kafkaProperties.getBootstrapServers())
.setRecordSerializer(new StockUpdateSerializer(kafkaProperties.getTopic()))
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // at least once delivery
.build();
}
}
Deserialization / Serialization
StockDeserializer.java
@Slf4j
public class StockDeserializer implements KafkaRecordDeserializationSchema<Stock> {
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<Stock> out) throws IOException {
Stock message = MAPPER.readValue(record.value(), Stock.class);
log.info("Received stock with symbol: {}", message.getSymbol());
out.collect(message);
}
@Override
public TypeInformation<Stock> getProducedType() {
return TypeInformation.of(Stock.class);
}
}
PriceDeserializer.java
@Slf4j
public class PriceDeserializer extends AbstractDeserializationSchema<Price> {
@Override
public Price deserialize(byte[] message) throws IOException {
Price priceMessage = MAPPER.readValue(message, Price.class);
log.info("Received price message for stock: {}", priceMessage.getSymbol());
return priceMessage;
}
}
StockUpdateSerializer.java
public class StockUpdateSerializer implements KafkaRecordSerializationSchema<StockUpdate> {
private final String topic;
public StockUpdateSerializer(String topic) {
this.topic = topic;
}
@SneakyThrows
@Nullable
@Override
public ProducerRecord<byte[], byte[]> serialize(StockUpdate element, KafkaSinkContext context, Long timestamp) {
byte[] key = element.getSymbol().getBytes();
byte[] result = MAPPER.writeValueAsBytes(element);
return new ProducerRecord<>(topic, key, result);
}
}
JacksonConfiguration.java
public class JacksonConfiguration {
public static final ObjectMapper MAPPER = defaultObjectMapper();
private static ObjectMapper defaultObjectMapper() {
return JsonMapper.builder()
.addModules(new JavaTimeModule(), new Jdk8Module())
.serializationInclusion(NON_NULL)
.serializationInclusion(NON_EMPTY)
.propertyNamingStrategy(new PropertyNamingStrategies.SnakeCaseStrategy())
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
.enable(DeserializationFeature.UNWRAP_SINGLE_VALUE_ARRAYS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE)
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS).build();
}
}
Joining
StockPriceJoiner.java
The StockPriceJoiner class extends RichCoFlatMapFunction, which is a function that takes two input streams and produces a single output stream. The flatMap1 method is called for each element in the first input stream, and the flatMap2 method is called for each element in the second input stream. In this case, we are joining the data from the two input streams and producing a single output stream.
@Slf4j
public class StockPriceJoiner extends RichCoFlatMapFunction<Stock, Price, StockUpdate> {
/**
* The state that is maintained by this process function
*/
private ValueState<Price> priceState;
@Override
public void open(Configuration parameters) throws Exception {
priceState = getRuntimeContext().getState(new ValueStateDescriptor<>("price", Price.class));
}
@Override
public void flatMap1(Stock stock, Collector<StockUpdate> out) throws Exception {
Price price = priceState.value();
if ((price != null) && (price.getPrice() != null)) {
log.info("Joining stock: {} with price: {}", stock.getSymbol(), price.getPrice());
out.collect(StockUpdate.builder()
.symbol(stock.getSymbol())
.price(price.getPrice())
.companyName(stock.getCompanyName())
.timestamp(price.getTimestamp())
.build());
}
}
@Override
public void flatMap2(Price value, Collector<StockUpdate> out) throws Exception {
priceState.update(value);
}
}
main class
DataStreamJob.java
@Slf4j
public class DataStreamJob {
private static final Set<String> ALLOWED_STOCKS = Set.of("AAPL", "GOOG", "AMZN", "MSFT", "TSLA");
public static void main(String[] args) throws Exception {
// Sets up the execution environment, which is the main entry point to building Flink applications.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure the environment
configureEnvironment(env);
// loading properties
PropertiesLoader propertiesLoader = new PropertiesLoader();
// Creating the execution plan
DataStreamSource<Stock> stockDataStream = env.fromSource(SourceBuilder.stockSource(propertiesLoader.stockKafkaProperties()),
WatermarkStrategy.noWatermarks(),
"stock-source");
DataStreamSource<Price> priceDataStream = env.fromSource(SourceBuilder.priceSource(propertiesLoader.priceKafkaProperties()),
WatermarkStrategy.noWatermarks(),
"price-source");
// send the result to a kafka topic using KafkaSink
KafkaSink<StockUpdate> stockUpdateKafkaSink = SourceBuilder.stockUpdateSink(propertiesLoader.stockUpdateKafkaProperties());
stockDataStream
.filter(stock -> ALLOWED_STOCKS.contains(stock.getSymbol()))
.connect(priceDataStream)
.keyBy(Stock::getSymbol, Price::getSymbol)
.flatMap(new StockPriceJoiner())
.name("join-stock-price")
.map(stockUpdate -> {
log.info("stock price update: {}", stockUpdate);
return stockUpdate;
})
.sinkTo(stockUpdateKafkaSink);
// Execute program, beginning computation.
env.execute("StockPriceStreamJob");
}
private static void configureEnvironment(StreamExecutionEnvironment env) {
// automatically recover from failures
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.seconds(10) // delay
));
// disable kryo serialization and use the PojoSerializer (for better efficiency)
env.getConfig().disableGenericTypes();
// configuring the task manager
env.configure(getConfiguration());
}
// Configure the task manager
private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.setString("taskmanager.cpu.cores", "4");
config.setString("taskmanager.memory.task.heap.size", "1024m");
return config;
}
}
Run
Run DataStreamJob
DataStreamJob.java
Produce stock message:
bin/kafka-console-producer.sh --topic stock --bootstrap-server localhost:9092
>{ "symbol": "GOOG", "company_name": "Alphabet Inc (Google)" }
Produce price message:
bin/kafka-console-producer.sh --topic price --bootstrap-server localhost:9092
>{ "symbol": "GOOG", "timestamp": "1712443970", "price": 1.5 }
Outputs
IntelliJ log:
[Source: stock-source -> Filter (11/28)#0] INFO c.a.s.serdes.StockDeserializer - Received stock with symbol: GOOG
[Source: price-source (4/28)#0] INFO c.a.s.serdes.PriceDeserializer - Received price message for stock: GOOG
[Source: stock-source -> Filter (11/28)#0] INFO c.a.s.serdes.StockDeserializer - Received stock with symbol: GOOG
[join-stock-price -> Map -> Sink: Writer -> Sink: Committer (18/28)#0] INFO c.a.s.operators.StockPriceJoiner - Joining stock: GOOG with price: 1.5
[join-stock-price -> Map -> Sink: Writer -> Sink: Committer (18/28)#0] INFO c.a.stockstreams.DataStreamJob - stock price update: StockUpdate(symbol=GOOG, companyName=Alphabet Inc (Google), timestamp=1712443970, price=1.5)
Kafka:
bin/kafka-console-consumer.sh --topic stock_update --from-beginning --bootstrap-server localhost:9092092
{"symbol":"GOOG","timestamp":"1712443970","price":1.5}
Example based on:
https://abdelrani.com/blog/streaming-data-using-apache-flink