Skip to main content

Read-Write Dynamic Table

Description

In Flink SQL, a table is a structured representation of data that can be queried using SQL syntax. Tables can be created from various sources such as streams, files, or other tables. To create a table, you register metadata to connect to the system you want to use. Dynamic tables process streaming data and continuously update their results to reflect changes on input tables.

In the Example we are querying a dynamic table and sending the result of the continuous query to another dynamic table.

querying-dynamic-table-to-another-dynamic-table.png

Prerequisites

  • JDK 11
  • Maven
  • IDE (IntelliJ IDEA, Eclipse)
  • Flink 1.20.0

Project Structure

.
└── read-write-dynamic-table
├── dependency-reduced-pom.xml
├── flink_source.txt // source data for creating dynamic table
├── pom.xml
├── README.md
└── src
└── main
├── java
│   └── com
│   └── jreact
│   └── flink
│   └── ReadWriteDynamicTable.java
└── resources
├── log4j
└── log4j.properties.txt

The source data for creating dynamic table:

Alice; 23
Bob; 657.9
Alice; 55.1

ReadWriteDynamicTable.java

public class ReadWriteDynamicTable {

public static void main(String[] args) throws Exception {

// We define all the parameters that initialize the table environment.
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();

final TableEnvironment tEnv = TableEnvironment.create(settings);

// Create a table and load data into it from a file.
tEnv.executeSql("CREATE TABLE transactions (name STRING, amount DOUBLE) WITH ('connector' = 'filesystem', 'path' = 'flink_source.txt', 'format' = 'csv', 'csv.field-delimiter' = ';')");

// Test select
tEnv.sqlQuery("SELECT COUNT(*) AS transactions_count FROM transactions")
.execute()
.print();

// Save the data to a file.
tEnv.executeSql("CREATE TABLE revenue ("
+ " name STRING, "
+ " total DOUBLE "
+ " ) WITH ( \n"
+ " 'connector'='filesystem', "
+ " 'path'='flink_output', "
+ " 'format'='csv', "
+ " 'sink.partition-commit.delay'='1 s', "
+ " 'sink.partition-commit.policy.kind'='success-file'"
+ " )");

tEnv.executeSql("INSERT INTO revenue SELECT name, SUM(amount) AS total from transactions GROUP BY name");

// Test select
tEnv.sqlQuery("SELECT name, total FROM revenue")
.execute()
.print();

}
}

Run & Output

Run in IDE (in our case IntelliJ IDEA)

+----------------------+
| transactions_count |
+----------------------+
| 3 |
+----------------------+
1 row in set

+--------------------------------+--------------------------------+
| name | total |
+--------------------------------+--------------------------------+
| Alice | 78.1 |
| Bob | 657.9 |
+--------------------------------+--------------------------------+
2 rows in set

mvn clean package

Copy target/read-write-dynamic-table-0.1.jar to flink-1.20.0/examples/jreact

Start local cluster:
./bin/start-cluster.sh

Copy flink_source.txt to flink-1.20.0

Open a new terminal and run:
./bin/flink run examples/jreact/read-write-dynamic-table-0.1.jar

Output

Job has been submitted with JobID 430edc57fcb45f876a9173804b84ca1d
+----------------------+
| transactions_count |
+----------------------+
| 3 |
+----------------------+
1 row in set
Job has been submitted with JobID 7a97e2f88bbc86fa1fc609a015aafa3e
Job has been submitted with JobID 667bb8689c1b4f9899b2625e06a4e5b0
+--------------------------------+--------------------------------+
| name | total |
+--------------------------------+--------------------------------+
| Alice | 78.1 |
| Bob | 657.9 |
+--------------------------------+--------------------------------+
2 rows in set

localhost:8081

read-write-dynamic-table-browse-01.png

Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/streams/flink/read-write-dynamic-table