Apache Flink 漫談系列 - 流表對偶(duality)性
實際問題 很多大資料計算產品,都對使用者提供了SQL API,比如Hive, Spark, Flink等,那麼SQL作為傳統關係資料庫的查詢語言,是應用在批查詢場景的。Hive和Spark本質上都是Batch的計算模式(在《Apache Flink 漫談系列 - 概述》我們介紹過Spark是Micro Batching模式),提供SQL API很容易被人理解,但是Flink是純流(Native Streaming)的計算模式, 流與批在資料集和計算過程上有很大的區別,如下:
批查詢場景的特點 - 有限資料集,一次查詢返回一個計算結果就結束查詢 流查詢場景的特點 - 無限資料集,一次查詢不斷修正計算結果,查詢永遠不結束
我們發現批與流的查詢場景在資料集合和計算過程上都有很大的不同,那麼基於Native Streaming模式的Apache Flink為啥也能為使用者提供SQL API呢? 流與批的語義關係 我們知道SQL都是作用於關係表的,在傳統資料庫中進行查詢時候,SQL所要查詢的表在觸發查詢時候資料是不會變化的,也就是說在查詢那一刻,表是一張靜態表,相當於是一個有限的批資料,這樣也說明SQL是源於對批計算的查詢的,那麼要回答Apache Flink為啥也能為使用者提供SQL API,我們首先要理解流與批在語義層面的關係。我們以一個具體示例說明,如下圖: 上圖展現的是一個攜帶時間戳和使用者名稱的點選事件流,我們先對這些事件流進行流式統計,同時在最後的流事件上觸發批計算。流計算中每接收一個數據都會觸發一次計算,我們以2018/4/30 22:37:45 Mary到來那一時間切片看,無論是在流還是批上計算結果都是6。也就是說在相同的資料來源,相同的查詢邏輯下,流和批的計算結果是相同的。相同的SQL在流和批這兩種模式下,最終結果是一致的,那麼流與批在語義上是完全相同的。 流與表的關係 流與批在語義上是一致的,SQL是作用於表的,那麼要回答Apache Flink為啥也能為使用者提供SQL API的問題,就變成了流與表是否具有等價性,也就是本篇要重點介紹的為什麼流表具有對偶(duality)性?如下圖所示,一張表可以看做為流嗎?同樣流可以看做是一張表嗎?如果可以需要怎樣的條件和變通? MySQL主備複製 在介紹流與表的關係之前我們先聊聊MySQL的主備複製,binlog是MySQL實現主備複製的核心手段,簡單來說MySQL主備複製實現分成三個步驟:
Master將改變(change logs)以二進位制日誌事件(binary log events)形式記錄到二進位制日誌(binlog)中; Slave將Master的binary log events拷貝到它的中繼日誌(relay log); Slave重做中繼日誌中的事件,將改變反映到資料;
具體如下圖所示: binlog 接下來我們從binlog模式,binlog格式以及通過檢視binlog的具體內容來詳盡介紹binlog與表的關係。 binlog模式 上面介紹的MySQL主備複製的核心手段是利用binlog實現的,那邊binlog會記錄那些內容呢?binlog記錄了資料庫所有的增、刪、更新等操作。MySQL支援三種方式記錄binlog:
statement-based logging - Events contain SQL statements that produce data changes (inserts, updates, deletes); row-based logging - Events describe changes to individual rows;
mixed-base logging - 該模式預設是statement-based,當遇到如下情況會自動切換到row-based:
NDB儲存引擎,DML操作以row格式記錄; 使用UUID()、USER()、CURRENT_USER()、FOUND_ROWS()等不確定函式; 使用Insert Delay語句; 使用使用者自定義函式(UDF); 使用臨時表;
binlog格式 我們以row-based 模式為例介紹一下binlog的儲存格式 ,所有的 binary log events都是位元組序列,由兩部分組成:
event header event data
關於event header和event data 的格式在資料庫的不同版本略有不同,但共同的地方如下: +=====================================+ | event | timestamp 0 : 4 | | header +----------------------------+ | | type_code 4 : 1 | | +----------------------------+ | | server_id 5 : 4 | | +----------------------------+ | | event_length 9 : 4 | | +----------------------------+ | |不同版本不一樣(省略) | +=====================================+ | event | fixed part | | data +----------------------------+ | | variable part | +=====================================+ 這裡有個值得我們注意的地方就是在binlog的header中有一個屬性是timestamp,這個屬性是標識了change發生的先後順序,在備庫進行復制時候會嚴格按照時間順序進行log的重放。 binlog的生成 我們以對MySQL進行實際操作的方式,直觀的介紹一下binlog的生成,binlog是二進位制儲存的,下面我們會利用工具檢視binlog的文字內容。 檢視一下binlog是否開啟: show variables like 'log_bin'-> ; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.00 sec) 檢視一下binlog的模式(我需要row-base模式): show variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (0.00 sec) 清除現有的binlog MySQL> reset master; Query OK, 0 rows affected (0.00 sec)建立一張我們做實驗的表MySQL> create table tab( -> id INT NOT NULL AUTO_INCREMENT, -> user VARCHAR(100) NOT NULL, -> clicks INT NOT NULL, -> PRIMARY KEY (id) -> ); Query OK, 0 rows affected (0.10 sec)
MySQL> show tables; +-------------------+ | Tables_in_Apache Flinkdb | +-------------------+ | tab | +-------------------+ 1 row in set (0.00 sec) 進行DML操作 MySQL> insert into tab(user, clicks) values ('Mary', 1); Query OK, 1 row affected (0.03 sec)
MySQL> insert into tab(user, clicks) values ('Bob', 1); Query OK, 1 row affected (0.08 sec)
MySQL> update tab set clicks=2 where user='Mary' -> ; Query OK, 1 row affected (0.06 sec) Rows matched: 1 Changed: 1 Warnings: 0
MySQL> insert into tab(user, clicks) values ('Llz', 1); Query OK, 1 row affected (0.08 sec)
MySQL> update tab set clicks=2 where user='Bob'; Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0
MySQL> update tab set clicks=3 where user='Mary'; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0
MySQL> select * from tab;
+----+------+--------+
| id | user | clicks |
+----+------+--------+
| 1 | Mary | 3 |
| 2 | Bob | 2 |
| 3 | Llz | 1 |
+----+------+--------+
3 rows in set (0.00 sec)
檢視正在操作的binlog
MySQL> show master status\G
*************************** 1. row ***************************
File: binlog.000001
Position: 2547
Binlog_Do_DB:
Binlog_Ignore_DB:
Executed_Gtid_Set:
1 row in set (0.00 sec)
上面 binlog.000001 檔案是我們正在操作的binlog。
檢視binlog.000001檔案的操作記錄
MySQL> show binlog events in 'binlog.000001';
+---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Log_name | Pos | Event_type | Server_id | End_log_pos | Info |
+---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| binlog.000001 | 4 | Format_desc | 1 | 124 | Server ver: 8.0.11, Binlog ver: 4 |
| binlog.000001 | 124 | Previous_gtids | 1 | 155 | |
| binlog.000001 | 155 | Anonymous_Gtid | 1 | 228 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| binlog.000001 | 228 | Query | 1 | 368 | useApache Flinkdb
; DROP TABLEtab
/* generated by server/ /
xid=22
/ |
| binlog.000001 | 368 | Anonymous_Gtid | 1 | 443 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| binlog.000001 | 443 | Query | 1 | 670 | useApache Flinkdb
; create table tab(
id INT NOT NULL AUTO_INCREMENT,
user VARCHAR(100) NOT NULL,
clicks INT NOT NULL,
PRIMARY KEY (id)
) /
xid=23/ |
| binlog.000001 | 670 | Anonymous_Gtid | 1 | 745 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| binlog.000001 | 745 | Query | 1 | 823 | BEGIN |
| binlog.000001 | 823 | Table_map | 1 | 890 | table_id: 96 (Apache Flinkdb.tab) |
| binlog.000001 | 890 | Write_rows | 1 | 940 | table_id: 96 flags: STMT_END_F |
| binlog.000001 | 940 | Xid | 1 | 971 | COMMIT /
xid=25/ |
| binlog.000001 | 971 | Anonymous_Gtid | 1 | 1046 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| binlog.000001 | 1046 | Query | 1 | 1124 | BEGIN |
| binlog.000001 | 1124 | Table_map | 1 | 1191 | table_id: 96 (Apache Flinkdb.tab) |
| binlog.000001 | 1191 | Write_rows | 1 | 1240 | table_id: 96 flags: STMT_END_F |
| binlog.000001 | 1240 | Xid | 1 | 1271 | COMMIT /
xid=26/ |
| binlog.000001 | 1271 | Anonymous_Gtid | 1 | 1346 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| binlog.000001 | 1346 | Query | 1 | 1433 | BEGIN |
| binlog.000001 | 1433 | Table_map | 1 | 1500 | table_id: 96 (Apache Flinkdb.tab) |
| binlog.000001 | 1500 | Update_rows | 1 | 1566 | table_id: 96 flags: STMT_END_F |
| binlog.000001 | 1566 | Xid | 1 | 1597 | COMMIT /
xid=27/ |
| binlog.000001 | 1597 | Anonymous_Gtid | 1 | 1672 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| binlog.000001 | 1672 | Query | 1 | 1750 | BEGIN |
| binlog.000001 | 1750 | Table_map | 1 | 1817 | table_id: 96 (Apache Flinkdb.tab) |
| binlog.000001 | 1817 | Write_rows | 1 | 1866 | table_id: 96 flags: STMT_END_F |
| binlog.000001 | 1866 | Xid | 1 | 1897 | COMMIT /
xid=28/ |
| binlog.000001 | 1897 | Anonymous_Gtid | 1 | 1972 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| binlog.000001 | 1972 | Query | 1 | 2059 | BEGIN |
| binlog.000001 | 2059 | Table_map | 1 | 2126 | table_id: 96 (Apache Flinkdb.tab) |
| binlog.000001 | 2126 | Update_rows | 1 | 2190 | table_id: 96 flags: STMT_END_F |
| binlog.000001 | 2190 | Xid | 1 | 2221 | COMMIT /
xid=29/ |
| binlog.000001 | 2221 | Anonymous_Gtid | 1 | 2296 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| binlog.000001 | 2296 | Query | 1 | 2383 | BEGIN |
| binlog.000001 | 2383 | Table_map | 1 | 2450 | table_id: 96 (Apache Flinkdb.tab) |
| binlog.000001 | 2450 | Update_rows | 1 | 2516 | table_id: 96 flags: STMT_END_F |
| binlog.000001 | 2516 | Xid | 1 | 2547 | COMMIT /
xid=30/ |
+---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
36 rows in set (0.00 sec)
上面我們進行了3次insert和3次update,那麼在binlog中我們看到了三條Write_rows和三條Update_rows,並且在記錄順序和操作順序保持一致,接下來我們看看Write_rows和Update_rows的具體timestamp和data明文。
匯出明文
sudo MySQLbinlog --start-datetime='2018-04-29 00:00:03' --stop-datetime='2018-05-02 00:30:00' --base64-output=decode-rows -v /usr/local/MySQL/data/binlog.000001 > ~/binlog.txt
開啟binlog.txt 內容如下:
/
!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/;
/!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0
/;
DELIMITER /!
/;
at 4
#180430 22:29:33 server id 1 end_log_pos 124 CRC32 0xff61797c Start: binlog v 4, server v 8.0.11 created 180430 22:29:33 at startup
Warning: this binlog is either in use or was not closed properly.
ROLLBACK/! /;
at 124
#180430 22:29:33 server id 1 end_log_pos 155 CRC32 0x629ae755 Previous-GTIDs
[empty]
at 155
#180430 22:32:11 server id 1 end_log_pos 228 CRC32 0xbde49fca Anonymous_GTID last_committed=0 sequence_number=1 rbr_only=no original_committed_timestamp=1525098731207902 immediate_commit_timestamp=1525098731207902 transaction_length=213
original_commit_timestamp=1525098731207902 (2018-04-30 22:32:11.207902 CST)
immediate_commit_timestamp=1525098731207902 (2018-04-30 22:32:11.207902 CST)
/!80001 SET @@session.original_commit_timestamp=1525098731207902 //! /; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/! /;
at 228
#180430 22:32:11 server id 1 end_log_pos 368 CRC32 0xe5f330e7 Query thread_id=9 exec_time=0 error_code=0 Xid = 22
useApache Flinkdb
/!
/;
SET TIMESTAMP=1525098731/!
/;
SET @@session.pseudo_thread_id=9/!
/;
SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/!
/;
SET @@session.sql_mode=1168113696/!
/;
SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/!
/;
/
!\C utf8mb4//
!
/;
SET @@session.character_set_client=255,@@session.collation_connection=255,@@session.collation_server=255/!
/;
SET @@session.lc_time_names=0/!
/;
SET @@session.collation_database=DEFAULT/!
/;
/!80005 SET @@session.default_collation_for_utf8mb4=255
//!
/;
DROP TABLEtab
/* generated by server/
/
!*/;
at 368
#180430 22:32:21 server id 1 end_log_pos 443 CRC32 0x50e5acb7 Anonymous_GTID last_committed=1 sequence_number=2 rbr_only=no original_committed_timestamp=1525098741628960 immediate_commit_timestamp=1525098741628960 transaction_length=302
original_commit_timestamp=1525098741628960 (2018-04-30 22:32:21.628960 CST)
immediate_commit_timestamp=1525098741628960 (2018-04-30 22:32:21.628960 CST)
/!80001 SET @@session.original_commit_timestamp=1525098741628960 //! /; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/! /;
at 443
#180430 22:32:21 server id 1 end_log_pos 670 CRC32 0xe1353dd6 Query thread_id=9 exec_time=0 error_code=0 Xid = 23 SET TIMESTAMP=1525098741/! /; create table tab( id INT NOT NULL AUTO_INCREMENT, user VARCHAR(100) NOT NULL, clicks INT NOT NULL, PRIMARY KEY (id) ) /! /;
at 670
#180430 22:36:53 server id 1 end_log_pos 745 CRC32 0xcf436fbb Anonymous_GTID last_committed=2 sequence_number=3 rbr_only=yes original_committed_timestamp=1525099013988373 immediate_commit_timestamp=1525099013988373 transaction_length=301 /!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED //! /;
original_commit_timestamp=1525099013988373 (2018-04-30 22:36:53.988373 CST)
immediate_commit_timestamp=1525099013988373 (2018-04-30 22:36:53.988373 CST)
/!80001 SET @@session.original_commit_timestamp=1525099013988373 //! /; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/! /;
at 745
#180430 22:36:53 server id 1 end_log_pos 823 CRC32 0x71c64dd2 Query thread_id=9 exec_time=0 error_code=0 SET TIMESTAMP=1525099013/! /; BEGIN /! /;
at 823
#180430 22:36:53 server id 1 end_log_pos 890 CRC32 0x63792f6b Table_map:Apache Flinkdb
.tab
mapped to number 96
at 890
#180430 22:36:53 server id 1 end_log_pos 940 CRC32 0xf2dade22 Write_rows: table id 96 flags: STMT_END_F
INSERT INTOApache Flinkdb
.tab
SET
### @1=1 ### @2='Mary' ### @3=1
at 940
#180430 22:36:53 server id 1 end_log_pos 971 CRC32 0x7db3e61e Xid = 25 COMMIT/! /;
at 971
#180430 22:37:06 server id 1 end_log_pos 1046 CRC32 0xd05dd12c Anonymous_GTID last_committed=3 sequence_number=4 rbr_only=yes original_committed_timestamp=1525099026328547 immediate_commit_timestamp=1525099026328547 transaction_length=300 /!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED //! /;
original_commit_timestamp=1525099026328547 (2018-04-30 22:37:06.328547 CST)
immediate_commit_timestamp=1525099026328547 (2018-04-30 22:37:06.328547 CST)
/!80001 SET @@session.original_commit_timestamp=1525099026328547 //! /; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/! /;
at 1046
#180430 22:37:06 server id 1 end_log_pos 1124 CRC32 0x80f259e0 Query thread_id=9 exec_time=0 error_code=0 SET TIMESTAMP=1525099026/! /; BEGIN /! /;
at 1124
#180430 22:37:06 server id 1 end_log_pos 1191 CRC32 0x255903ba Table_map:Apache Flinkdb
.tab
mapped to number 96
at 1191
#180430 22:37:06 server id 1 end_log_pos 1240 CRC32 0xe76bfc79 Write_rows: table id 96 flags: STMT_END_F
INSERT INTOApache Flinkdb
.tab
SET
### @1=2 ### @2='Bob' ### @3=1
at 1240
#180430 22:37:06 server id 1 end_log_pos 1271 CRC32 0x83cddfef Xid = 26 COMMIT/! /;
at 1271
#180430 22:37:15 server id 1 end_log_pos 1346 CRC32 0x7095baee Anonymous_GTID last_committed=4 sequence_number=5 rbr_only=yes original_committed_timestamp=1525099035811597 immediate_commit_timestamp=1525099035811597 transaction_length=326 /!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED //! /;
original_commit_timestamp=1525099035811597 (2018-04-30 22:37:15.811597 CST)
immediate_commit_timestamp=1525099035811597 (2018-04-30 22:37:15.811597 CST)
/!80001 SET @@session.original_commit_timestamp=1525099035811597 //! /; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/! /;
at 1346
#180430 22:37:15 server id 1 end_log_pos 1433 CRC32 0x70ef97e2 Query thread_id=9 exec_time=0 error_code=0 SET TIMESTAMP=1525099035/! /; BEGIN /! /;
at 1433
#180430 22:37:15 server id 1 end_log_pos 1500 CRC32 0x75f1f399 Table_map:Apache Flinkdb
.tab
mapped to number 96
at 1500
#180430 22:37:15 server id 1 end_log_pos 1566 CRC32 0x256bd4b8 Update_rows: table id 96 flags: STMT_END_F
UPDATEApache Flinkdb
.tab
WHERE
### @1=1 ### @2='Mary' ### @3=1
SET
### @1=1 ### @2='Mary' ### @3=2
at 1566
#180430 22:37:15 server id 1 end_log_pos 1597 CRC32 0x93c86579 Xid = 27 COMMIT/! /;
at 1597
#180430 22:37:27 server id 1 end_log_pos 1672 CRC32 0xe8bd63e7 Anonymous_GTID last_committed=5 sequence_number=6 rbr_only=yes original_committed_timestamp=1525099047219517 immediate_commit_timestamp=1525099047219517 transaction_length=300 /!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED //! /;
original_commit_timestamp=1525099047219517 (2018-04-30 22:37:27.219517 CST)
immediate_commit_timestamp=1525099047219517 (2018-04-30 22:37:27.219517 CST)
/!80001 SET @@session.original_commit_timestamp=1525099047219517 //! /; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/! /;
at 1672
#180430 22:37:27 server id 1 end_log_pos 1750 CRC32 0x5356c3c7 Query thread_id=9 exec_time=0 error_code=0 SET TIMESTAMP=1525099047/! /; BEGIN /! /;
at 1750
#180430 22:37:27 server id 1 end_log_pos 1817 CRC32 0x37e6b1ce Table_map:Apache Flinkdb
.tab
mapped to number 96
at 1817
#180430 22:37:27 server id 1 end_log_pos 1866 CRC32 0x6ab1bbe6 Write_rows: table id 96 flags: STMT_END_F
INSERT INTOApache Flinkdb
.tab
SET
### @1=3 ### @2='Llz' ### @3=1
at 1866
#180430 22:37:27 server id 1 end_log_pos 1897 CRC32 0x3b62b153 Xid = 28 COMMIT/! /;
at 1897
#180430 22:37:36 server id 1 end_log_pos 1972 CRC32 0x603134c1 Anonymous_GTID last_committed=6 sequence_number=7 rbr_only=yes original_committed_timestamp=1525099056866022 immediate_commit_timestamp=1525099056866022 transaction_length=324 /!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED //! /;
original_commit_timestamp=1525099056866022 (2018-04-30 22:37:36.866022 CST)
immediate_commit_timestamp=1525099056866022 (2018-04-30 22:37:36.866022 CST)
/!80001 SET @@session.original_commit_timestamp=1525099056866022 //! /; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/! /;
at 1972
#180430 22:37:36 server id 1 end_log_pos 2059 CRC32 0xe17df4e4 Query thread_id=9 exec_time=0 error_code=0 SET TIMESTAMP=1525099056/! /; BEGIN /! /;
at 2059
#180430 22:37:36 server id 1 end_log_pos 2126 CRC32 0x53888b05 Table_map:Apache Flinkdb
.tab
mapped to number 96
at 2126
#180430 22:37:36 server id 1 end_log_pos 2190 CRC32 0x85f34996 Update_rows: table id 96 flags: STMT_END_F
UPDATEApache Flinkdb
.tab
WHERE
### @1=2 ### @2='Bob' ### @3=1
SET
### @1=2 ### @2='Bob' ### @3=2
at 2190
#180430 22:37:36 server id 1 end_log_pos 2221 CRC32 0x877f1e23 Xid = 29 COMMIT/! /;
at 2221
#180430 22:37:45 server id 1 end_log_pos 2296 CRC32 0xfbc7e868 Anonymous_GTID last_committed=7 sequence_number=8 rbr_only=yes original_committed_timestamp=1525099065089940 immediate_commit_timestamp=1525099065089940 transaction_length=326 /!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED //! /;
original_commit_timestamp=1525099065089940 (2018-04-30 22:37:45.089940 CST)
immediate_commit_timestamp=1525099065089940 (2018-04-30 22:37:45.089940 CST)
/!80001 SET @@session.original_commit_timestamp=1525099065089940 //! /; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/! /;
at 2296
#180430 22:37:45 server id 1 end_log_pos 2383 CRC32 0x8a514364 Query thread_id=9 exec_time=0 error_code=0 SET TIMESTAMP=1525099065/! /; BEGIN /! /;
at 2383
#180430 22:37:45 server id 1 end_log_pos 2450 CRC32 0xdf18ca60 Table_map:Apache Flinkdb
.tab
mapped to number 96
at 2450
#180430 22:37:45 server id 1 end_log_pos 2516 CRC32 0xd50de69f Update_rows: table id 96 flags: STMT_END_F
UPDATEApache Flinkdb
.tab
WHERE
### @1=1 ### @2='Mary' ### @3=2
SET
### @1=1 ### @2='Mary' ### @3=3
at 2516
#180430 22:37:45 server id 1 end_log_pos 2547 CRC32 0x94f89393 Xid = 30 COMMIT/! /; SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by MySQLbinlog/ / !*/; DELIMITER ;
End of log file
/!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE /; /!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0 /;
梳理操作和binlog的記錄關係
DML binlog-header(timestamp) data
insert into blink_tab(user, clicks) values ('Mary', 1);
1525099013 (2018/4/30 22:36:53)
INSERT INTO blinkdb.blink_tab ### SET ### @1=1 ### @2='Mary' ### @3=1
insert into blink_tab(user, clicks) values ('Bob', 1);
1525099026 (2018/4/30 22:37:06)
INSERT INTO blinkdb.blink_tab ### SET ### @1=2 ### @2='Bob' ### @3=1
update blink_tab set clicks=2 where user='Mary';
1525099035 (2018/4/30 22:37:15)
### UPDATE blinkdb.blink_tab ### WHERE ### @1=1 ### @2='Mary' ### @3=1 ### SET ### @1=1 ### @2='Mary' ### @3=2
insert into blink_tab(user, clicks) values ('Llz', 1);
1525099047 (2018/4/30 22:37:27)
INSERT INTO blinkdb.blink_tab ### SET ### @1=3 ### @2='Llz' ### @3=1
update blink_tab set clicks=2 where user='Bob';
1525099056 (2018/4/30 22:37:36)
UPDATE blinkdb.blink_tab ### WHERE ### @1=2 ### @2='Bob' ### @3=1 ### SET ### @1=2 ### @2='Bob' ### @3=2
update blink_tab set clicks=3 where user='Mary';
1525099065 (2018/4/30 22:37:45)
UPDATE blinkdb.blink_tab ### WHERE ### @1=1 ### @2='Mary' ### @3=2 ### SET ### @1=1 ### @2='Mary' ### @3=3
簡化一下binlog
timestamp user clicks
1525099013 Mary 1
1525099026 Bob 1
1525099035 Mary 2
1525099047 LIz 1
1525099056 Bob 2
1525099065 Mary 3
replay binlog會得到如下表資料(按timestamp順序)
user clicks
Mary 3
Bob 2
LIz 1
表與binlog的關係簡單示意如下
流表對偶(duality)性 前面我花費了一些時間介紹了MySQL主備複製機制和binlog的資料格式,binlog中攜帶時間戳,我們將所有表的操作都按時間進行記錄下來形成binlog,而對binlog的event進行重放的過程就是流資料處理的過程,重放的結果恰恰又形成了一張表。也就是表的操作會形成攜帶時間的事件流,對流的處理又會形成一張不斷變化的表,表和流具有等價性,可以互轉。隨著時間推移,DML操作不斷進行,那麼表的內容也不斷變化,具體如下:
如上圖所示內容,流和表具備相同的特徵:
表 - Schema,Data,DML操作時間 流 - Schema,Data, Data處理時間
我們發現,雖然大多數表上面沒有明確的顯示出DML操作時間,但本質上資料庫系統裡面是有資料操作時間資訊的,這個和流上資料的處理時間(processing time)/產生時間(event-time)相對應。流與表具備相同的特徵,可以資訊無損的相互轉換,我稱之為流表對偶(duality)性。 上面我們描述的表,在流上稱之為動態表(Dynamic Table),原因是在流上面任何一個事件的到來都是對錶上資料的一次更新(包括插入和刪除),表的內容是不斷的變化的,任何一個時刻流的狀態和表的快照一一對應。流與動態表(Dynamic Table)在時間維度上面具有等價性,這種等價性我們稱之為流和動態表(Dynamic Table)的對偶(duality)性。 小結 本篇主要介紹Apache Flink作為一個流計算平臺為什麼可以為使用者提供SQL API。其根本原因是如果將流上的資料看做是結構化的資料,流任務的核心是將一個具有時間屬性的結構化資料變成同樣具有時間屬性的另一個結構化資料,而表的資料變化過程binlog恰恰就是一份具有時間屬性的流資料,流與表具有信息無損的相互轉換的特性,這種流表對偶性也決定了Apache Flink可以採用SQL作為流任務的開發語言。