由於公司業務需要,需要實時同步pgsql資料,我們選擇使用flink-cdc方式進行

架構圖:

前提步驟:

1,更改配置檔案postgresql.conf

# 更改wal日誌方式為logicalwal_level = logical            # minimal, replica, or logical
# 更改solts最大數量(預設值為10),flink-cdc預設一張表佔用一個slotsmax_replication_slots = 20 # max number of replication slots
# 更改wal傳送最大程序數(預設值為10),這個值和上面的solts設定一樣max_wal_senders = 20 # max number of walsender processes# 中斷那些停止活動超過指定毫秒數的複製連線,可以適當設定大一點(預設60s)wal_sender_timeout = 180s # in milliseconds; 0 disable  

更改配置檔案postgresql.conf完成,需要重啟pg服務生效,所以一般是在業務低峰期更改

2,新建使用者並且給使用者複製流許可權

-- pg新建使用者CREATE USER user WITH PASSWORD 'pwd';
-- 給使用者複製流許可權ALTER ROLE user replication;
-- 給使用者登入資料庫許可權grant CONNECT ON DATABASE test to user;
-- 把當前庫public下所有表查詢許可權賦給使用者GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;

3,釋出表

-- 設定釋出為true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表進行釋出
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查詢哪些表已經發布
select * from pg_publication_tables;

4,更改表的複製標識包含更新和刪除的值

-- 更改複製標識包含更新和刪除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;
-- 檢視複製標識(為f標識說明設定成功)
select relreplident from pg_class where relname='test0425';

OK,到這一步,設定已經完全可以啦,上面步驟都是必須的

5,下面開始上程式碼:,

maven依賴

        <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.13.0</version>
</dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.13.0</version>
</dependency> <dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>1.1.0</version>
</dependency>

java程式碼

package flinkTest.connect;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class PgsqlToMysqlTest {
public static void main(String[] args) {
//設定flink表環境變數
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build(); //獲取flink流環境變數
StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
exeEnv.setParallelism(1); //表執行環境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(exeEnv, fsSettings); //拼接souceDLL
String sourceDDL =
"CREATE TABLE pgsql_source (\n" +
" id int,\n" +
" name STRING,\n" +
" py_code STRING,\n" +
" seq_no int,\n" +
" description STRING\n" +
") WITH (\n" +
" 'connector' = 'postgres-cdc',\n" +
" 'hostname' = '***',\n" +
" 'port' = '5432',\n" +
" 'username' = '***',\n" +
" 'password' = '***',\n" +
" 'database-name' = '***',\n" +
" 'schema-name' = 'public',\n" +
" 'decoding.plugin.name' = 'pgoutput',\n" +
" 'debezium.slot.name' = '***',\n" +
" 'table-name' = '***'\n" +
")"; String sinkDDL =
"CREATE TABLE mysql_sink (\n" +
" id int,\n" +
" name STRING,\n" +
" py_code STRING,\n" +
" seq_no int,\n" +
" description STRING,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://ip:3306/DB?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8',\n" +
" 'username' = '***',\n" +
" 'password' = '***',\n" +
" 'table-name' = '***'\n" +
")"; String transformSQL =
"INSERT INTO mysql_sink " +
"SELECT id,name,py_code,seq_no,description " +
"FROM pgsql_source"; //執行source表ddl
tableEnv.executeSql(sourceDDL);
//執行sink表ddl
tableEnv.executeSql(sinkDDL);
//執行邏輯sql語句
TableResult tableResult = tableEnv.executeSql(transformSQL); }
}

表機構奉上:

-- pgsql表結構
CREATE TABLE "public"."test" (
"id" int4 NOT NULL,
"name" varchar(50) COLLATE "pg_catalog"."default" NOT NULL,
"py_code" varchar(50) COLLATE "pg_catalog"."default",
"seq_no" int4 NOT NULL,
"description" varchar(200) COLLATE "pg_catalog"."default",
CONSTRAINT "pk_zd_business_type" PRIMARY KEY ("id")
)
; -- mysql表結構
CREATE TABLE `test` (
`id` int(11) NOT NULL DEFAULT '0' COMMENT 'ID',
`name` varchar(50) DEFAULT NULL COMMENT '名稱',
`py_code` varchar(50) DEFAULT NULL COMMENT '助記碼',
`seq_no` int(11) DEFAULT NULL COMMENT '排序',
`description` varchar(200) DEFAULT NULL COMMENT '備註',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

6,下面就可以進行操作原表,然後增刪改操作