Read-Write Dynamic Table
Description
In Flink SQL, a table is a structured representation of data that can be queried using SQL syntax. Tables can be created from various sources such as streams, files, or other tables. To create a table, you register metadata to connect to the system you want to use. Dynamic tables process streaming data and continuously update their results to reflect changes on input tables.
In the Example we are querying a dynamic table and sending the result of the continuous query to another dynamic table.

Prerequisites
- JDK 11
- Maven
- IDE (IntelliJ IDEA, Eclipse)
- Flink 1.20.0
Project Structure
.
└── read-write-dynamic-table
├── dependency-reduced-pom.xml
├── flink_source.txt // source data for creating dynamic table
├── pom.xml
├── README.md
└── src
└── main
├── java
│ └── com
│ └── jreact
│ └── flink
│ └── ReadWriteDynamicTable.java
└── resources
├── log4j
└── log4j.properties.txt
flink_source.txt
The source data for creating dynamic table:
Alice; 23
Bob; 657.9
Alice; 55.1
ReadWriteDynamicTable.java
public class ReadWriteDynamicTable {
public static void main(String[] args) throws Exception {
// We define all the parameters that initialize the table environment.
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();
final TableEnvironment tEnv = TableEnvironment.create(settings);
// Create a table and load data into it from a file.
tEnv.executeSql("CREATE TABLE transactions (name STRING, amount DOUBLE) WITH ('connector' = 'filesystem', 'path' = 'flink_source.txt', 'format' = 'csv', 'csv.field-delimiter' = ';')");
// Test select
tEnv.sqlQuery("SELECT COUNT(*) AS transactions_count FROM transactions")
.execute()
.print();
// Save the data to a file.
tEnv.executeSql("CREATE TABLE revenue ("
+ " name STRING, "
+ " total DOUBLE "
+ " ) WITH ( \n"
+ " 'connector'='filesystem', "
+ " 'path'='flink_output', "
+ " 'format'='csv', "
+ " 'sink.partition-commit.delay'='1 s', "
+ " 'sink.partition-commit.policy.kind'='success-file'"
+ " )");
tEnv.executeSql("INSERT INTO revenue SELECT name, SUM(amount) AS total from transactions GROUP BY name");
// Test select
tEnv.sqlQuery("SELECT name, total FROM revenue")
.execute()
.print();
}
}
Run & Output
Run in IDE (in our case IntelliJ IDEA)
+----------------------+
| transactions_count |
+----------------------+
| 3 |
+----------------------+
1 row in set
+--------------------------------+--------------------------------+
| name | total |
+--------------------------------+--------------------------------+
| Alice | 78.1 |
| Bob | 657.9 |
+--------------------------------+--------------------------------+
2 rows in set
Run in flink-1.20.0
mvn clean package
Copy target/read-write-dynamic-table-0.1.jar to flink-1.20.0/examples/jreact
Start local cluster:
./bin/start-cluster.sh
Copy flink_source.txt to flink-1.20.0
Open a new terminal and run:
./bin/flink run examples/jreact/read-write-dynamic-table-0.1.jar
Output
Job has been submitted with JobID 430edc57fcb45f876a9173804b84ca1d
+----------------------+
| transactions_count |
+----------------------+
| 3 |
+----------------------+
1 row in set
Job has been submitted with JobID 7a97e2f88bbc86fa1fc609a015aafa3e
Job has been submitted with JobID 667bb8689c1b4f9899b2625e06a4e5b0
+--------------------------------+--------------------------------+
| name | total |
+--------------------------------+--------------------------------+
| Alice | 78.1 |
| Bob | 657.9 |
+--------------------------------+--------------------------------+
2 rows in set
localhost:8081
