Skip to main content

Data Sources

Data Source Concepts

This page describes Flink’s Data Source API and the concepts and architecture behind it. Read this, if you are interested in how data sources in Flink work, or if you want to implement a new Data Source.

If you are looking for pre-defined source connectors, please check the Connector Docs.

Overview

Every Flink datastream starts with a Source (or possibly more than one). This is the origin of the data. This data may be created programmatically, it may be read from a file or a database, or it may come from a streaming platform such as Apache Kafka. Depending on the nature of the source, the datastream might be finite it might be infinite. Understanding the difference can be important for implementing certain types of operations in the stream. This video will introduce a few different types of sources and discuss the difference between finite and infinite data streams.

Topics:

  • Sources
  • Finite Sources
  • Infinite Sources
  • Creating programmatic Sources
  • File Sources
  • Kafka Sources

Code

StreamExecutionEnvironment.fromElements

DataStream<Integer> stream = env.fromElements(1,2,3,4,5);

DataGeneratorSource

DataGeneratorSource<String> source =
new DataGeneratorSource<>(
index -> "String"+index,
numRecords,
RateLimiterStrategy.perSecond(1),
Types.STRING
);

FileSource

FileSource<String> source =
FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("<PATH_TO_FILE>")
).build();

KafkaSource

KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(config)
.setTopics("topic1", "topic2")
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

DataStream.fromSource

DataStream<String> stream = env
.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"my_source"
);

DataStream.print

stream.print();

Resources