Postgres to FlinkTable
Description
In the steps described below we will create a batch transformation that will take information from one source (postgresql jdbc table) and write information to another place - Flink table.
Prerequisites
- JDK 11
- Maven
- IDE (IntelliJ IDEA, Eclipse)
- Flink 1.20.0
Project Structure
Setup Data Source
- Create Docker PostgreSQL
version: "3.8"
services:
postgres:
image: postgres:14-alpine
restart: always
environment:
- POSTGRES_USER=root
- POSTGRES_PASSWORD=secret
ports:
- "5432:5432"
volumes:
- db:/var/lib/postgresql/data
volumes:
db:
docker compose -f docker-compose.yaml up
- Create table with data
- Start PostgreSQL terminal:
// docker exec -ti {container} psql -d {db} -U {user}
// in our case:
$ docker exec -ti jdbc-flinktable-postgres-1 psql -d postgres -U root
psql (14.14)
Type "help" for help.
postgres=# - Create table:
CREATE TABLE clients (
id int4 NULL,
"name" varchar NULL
); - Insert data into clients:
INSERT INTO clients (id, name) VALUES (1, 'Alice');
INSERT INTO clients (id, name) VALUES (2, 'Bob'); - Check inserting:
SELECT * FROM clients;
id | name
----+-------
1 | Alice
2 | Bob
(2 rows)
Project Structure
.
├── dependency-reduced-pom.xml
├── docker-compose.yaml
├── pom.xml
├── README.md
└── src
└── main
├── java
│ └── com
│ └── jreact
│ └── flink
│ └── jdbcFlinkTable.java
└── resources
├── log4j
└── log4j.properties.txt
jdbcFlinkTable.java
public class PostgresFlinkTable {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
final TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE DClients ("
+ " id INT, "
+ " name VARCHAR(255) "
+ " ) WITH ( "
+ " 'connector' = 'jdbc', "
+ " 'url' = 'jdbc:postgresql://localhost:5432/postgres', "
+ " 'username' = 'root', "
+ " 'table-name' = 'clients', "
+ " 'password' = 'secret'"
+ ")");
tEnv.sqlQuery("SELECT * FROM DClients")
.execute()
.print();
}
}
Run & Output
+-------------+--------------------------------+
| id | name |
+-------------+--------------------------------+
| 1 | Alice |
| 2 | Bob |
+-------------+--------------------------------+