Skip to main content

Postgres to FlinkTable

Description

In the steps described below we will create a batch transformation that will take information from one source (postgresql jdbc table) and write information to another place - Flink table.

Prerequisites

  • JDK 11
  • Maven
  • IDE (IntelliJ IDEA, Eclipse)
  • Flink 1.20.0

Project Structure

Setup Data Source

- Create Docker PostgreSQL

version: "3.8"
services:
postgres:
image: postgres:14-alpine
restart: always
environment:
- POSTGRES_USER=root
- POSTGRES_PASSWORD=secret
ports:
- "5432:5432"
volumes:
- db:/var/lib/postgresql/data
volumes:
db:
docker compose -f docker-compose.yaml up

- Create table with data

  • Start PostgreSQL terminal:
    // docker exec -ti {container} psql -d {db} -U {user}
    // in our case:
    $ docker exec -ti jdbc-flinktable-postgres-1 psql -d postgres -U root
    psql (14.14)
    Type "help" for help.

    postgres=#
  • Create table:
    CREATE TABLE clients (
    id int4 NULL,
    "name" varchar NULL
    );
  • Insert data into clients:
    INSERT INTO clients (id, name) VALUES (1, 'Alice');
    INSERT INTO clients (id, name) VALUES (2, 'Bob');
  • Check inserting:
    SELECT * FROM clients;
    id | name
    ----+-------
    1 | Alice
    2 | Bob
    (2 rows)

Project Structure

.
├── dependency-reduced-pom.xml
├── docker-compose.yaml
├── pom.xml
├── README.md
└── src
└── main
├── java
│   └── com
│   └── jreact
│   └── flink
│   └── jdbcFlinkTable.java
└── resources
├── log4j
└── log4j.properties.txt

jdbcFlinkTable.java

public class PostgresFlinkTable {

public static void main(String[] args) throws Exception {

EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
final TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv.executeSql("CREATE TABLE DClients ("
+ " id INT, "
+ " name VARCHAR(255) "
+ " ) WITH ( "
+ " 'connector' = 'jdbc', "
+ " 'url' = 'jdbc:postgresql://localhost:5432/postgres', "
+ " 'username' = 'root', "
+ " 'table-name' = 'clients', "
+ " 'password' = 'secret'"
+ ")");

tEnv.sqlQuery("SELECT * FROM DClients")
.execute()
.print();

}
}

Run & Output

+-------------+--------------------------------+
| id | name |
+-------------+--------------------------------+
| 1 | Alice |
| 2 | Bob |
+-------------+--------------------------------+

Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/streams/flink/postgres-flinkTable