1. 程式人生 > >sqoop用法之mysql與hive資料匯入匯出

sqoop用法之mysql與hive資料匯入匯出

[TOC] ## 一. Sqoop介紹 `Sqoop`是一個用來將`Hadoop`和關係型資料庫中的資料相互轉移的工具,可以將一個關係型資料庫(例如:`MySQL、Oracle、Postgres`等)中的資料導進到`Hadoop`的`HDFS`中,也可以將`HDFS`的資料導進到關係型資料庫中。對於某些`NoSQL`資料庫它也提供了聯結器。`Sqoop`,類似於其他`ETL`工具,使用元資料模型來判斷資料型別並在資料從資料來源轉移到`Hadoop`時確保型別安全的資料處理。`Sqoop`專為大資料批量傳輸設計,能夠分割資料集並建立`Hadoop`任務來處理每個區塊。 ![](https://img2020.cnblogs.com/blog/1165270/202012/1165270-20201222215720213-2120426376.png) **本文版本說明** >`hadoop`版本 : `hadoop-2.7.2` >`hive版本` : `hive-2.1.0` >sqoop版本:`sqoop-1.4.6` ## 二. Mysql 資料匯入到 Hive 1). 將`mysql`的`people_access_log`表匯入到`hive`表`web.people_access_log`,並且`hive`中的表不存在。 `mysql`中表`people_access_log`資料為: ```java 1,15110101010,1577003281739,'112.168.1.2','https://www.baidu.com' 2,15110101011,1577003281749,'112.16.1.23','https://www.baidu.com' 3,15110101012,1577003281759,'193.168.1.2','https://www.taobao.com' 4,15110101013,1577003281769,'112.18.1.2','https://www.baidu.com' 5,15110101014,1577003281779,'112.168.10.2','https://www.baidu.com' 6,15110101015,1577003281789,'11.168.1.2','https://www.taobao.com' ``` 將`mysql`資料匯入`hive`的命令為: ```bash sqoop import \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --table people_access_log \ -m 1 \ --hive-import \ --create-hive-table \ --fields-terminated-by '\t' \ --hive-table web.people_access_log ``` 該命令會啟用一個`mapreduce`任務,將`mysql`資料匯入到`hive`表,並且指定了`hive`表的分隔符為`\t`,如果不指定則為預設分隔符`^A(ctrl+A)`。 **引數說明** |引數|說明| |:---:|:---:| |`--connect`|`mysql`的連線資訊| |`--username`|`mysql`的使用者名稱| |`--password`|`mysql`的密碼| |`--table`|被匯入的`mysql`源表名| |`-m`|並行匯入啟用的`map`任務數量,與`--num-mapper`含義一樣| |`--hive-import`|插入資料到`hive`當中,使用`hive`預設的分隔符,可以使用`--fields-terminated-by`引數來指定分隔符。| |`-- hive-table`|hive當中的表名| 2). 也可以通過`--query`條件查詢`Mysql`資料,將查詢結果匯入到`Hive` ```bash sqoop import \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --query 'select * from people_access_log where \$CONDITIONS and url = "https://www.baidu.com"' \ --target-dir /user/hive/warehouse/web/people_access_log \ --delete-target-dir \ --fields-terminated-by '\t' \ -m 1 ``` |引數|說明| |:---:|:---:| |`--query`|後接查詢語句,條件查詢需要`\$CONDITIONS and`連線查詢條件,這裡的`\$`表示轉義`$`,必須有.| |`--delete-target-dir`|如果目標`hive`表目錄存在,則刪除,相當於`overwrite`.| ## 三. Hive資料匯入到Mysql 還是使用上面的`hive`表`web.people_access_log`,將其匯入到`mysql`中的`people_access_log_out`表中. ```bash sqoop export \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --table people_access_log_out \ --input-fields-terminated-by '\t' \ --export-dir /user/hive/warehouse/web.db/people_access_log \ --num-mappers 1 ``` 注意:`mysql`表`people_access_log_out`需要提前建好,否則報錯:`ErrorException: Table 'test.people_access_log_out' doesn't exist`。如果有`id`自增列,`hive`表也需要有,`hive`表與`mysql`表字段必須完全相同。 ```sql create table people_access_log_out like people_access_log; ``` 執行完一個`mr`任務後,成功匯入到`mysql`表`people_access_log_out`中. ## 四. mysql資料增量匯入hive 實際中`mysql`資料會不斷增加,這時候需要用`sqoop`將資料增量匯入`hive`,然後進行海量資料分析統計。增量資料匯入分兩種,一是基於遞增列的增量資料匯入(`Append`方式)。二是基於時間列的增量資料匯入(`LastModified`方式)。有幾個核心引數: - `–check-column`:用來指定一些列,這些列在增量匯入時用來檢查這些資料是否作為增量資料進行匯入,和關係型資料庫中的自增欄位及時間戳類似.**注意**:這些被指定的列的型別不能使任意字元型別,如char、varchar等型別都是不可以的,同時`–check-column`可以去指定多個列 - `–incremental`:用來指定增量匯入的模式,兩種模式分別為`Append`和`Lastmodified` - `–last-value`:指定上一次匯入中檢查列指定欄位最大值 ### 1. 基於遞增列Append匯入 接著前面的日誌表,裡面每行有一個唯一標識自增列`ID`,在關係型資料庫中以主鍵形式存在。之前已經將id在`0~6`之間的編號的訂單匯入到`Hadoop`中了(這裡為`HDFS`),現在一段時間後我們需要將近期產生的新的訂 單資料匯入`Hadoop`中(這裡為`HDFS`),以供後續數倉進行分析。此時我們只需要指定`–incremental` 引數為`append`,`–last-value`引數為`6`即可。表示只從`id`大於`6`後即`7`開始匯入。 #### 1). 建立`hive`表 首先我們需要建立一張與`mysql`結構相同的`hive`表,假設指定欄位分隔符為`\t`,後面匯入資料時候分隔符也需要保持一致。 #### 2). 建立`job` 增量匯入肯定是多次進行的,可能每隔一個小時、一天等,所以需要建立計劃任務,然後定時執行即可。我們都知道`hive`的資料是存在`hdfs`上面的,我們建立`sqoop job`的時候需要指定`hive`的資料表對應的`hdfs`目錄,然後定時執行這個`job`即可。 當前`mysql`中資料,`hive`中資料與`mysql`一樣也有6條: |`id`|`user_id`|`access_time`|`ip`|`url`| |:---:|:---:|:---:|:---:|:---:| |1|15110101010|1577003281739|112.168.1.2|https://www.baidu.com| |2|15110101011|1577003281749|112.16.1.23|https://www.baidu.com| |3|15110101012|1577003281759|193.168.1.2|https://www.taobao.com| |4|15110101013|1577003281769|112.18.1.2|https://www.baidu.com| |5|15110101014|1577003281779|112.168.10.2|https://www.baidu.com| |6|15110101015|1577003281789|11.168.1.2|https://www.taobao.com| 增量匯入有幾個引數,保證下次同步的時候可以接著上次繼續同步. ```bash sqoop job --create mysql2hive_job -- import \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --table people_access_log \ --target-dir /user/hive/warehouse/web.db/people_access_log \ --check-column id \ --incremental append \ --fields-terminated-by '\t' \ --last-value 6 \ -m 1 ``` 這裡通過`sqoop job --create job_name`命令建立了一個名為`mysql2hive_job`的`sqoop job`。 #### 3). 執行job 建立好了`job`,後面只需要定時週期執行這個提前定義好的`job`即可。我們先往`mysql`裡面插入2條資料。 ```sql INSERT INTO `people_access_log` (`id`,`user_id`,`access_time`,`ip`,`url`) VALUES (7,15110101016,1577003281790,'112.168.1.3','https://www.qq.com'), (8,15110101017,1577003281791,'112.1.1.3','https://www.microsoft.com'); ``` 這樣`mysql`裡面就會多了2條資料。此時`hive`裡面只有`id`為`1 ~ 6`的資料,執行同步`job`使用以下命令。 ```bash sqoop job -exec mysql2hive_job ``` 執行完成後,發現剛才`mysql`新加入的`id`為`7 ~ 8`的兩條資料已經同步到`hive`。 ```sql hive> select * from web.people_access_log; OK 1 15110101010 1577003281739 112.168.1.2 https://www.baidu.com 2 15110101011 1577003281749 112.16.1.23 https://www.baidu.com 3 15110101012 1577003281759 193.168.1.2 https://www.taobao.com 4 15110101013 1577003281769 112.18.1.2 https://www.baidu.com 5 15110101014 1577003281779 112.168.10.2 https://www.baidu.com 6 15110101015 1577003281789 11.168.1.2 https://www.taobao.com 7 15110101016 1577003281790 112.168.1.3 https://www.qq.com 8 15110101017 1577003281791 112.1.1.3 https://www.microsoft.com ``` 由於實際場景中,`mysql`表中的資料,比如訂單表等,通常是一致有資料進入的,這時候只需要將`sqoop job -exec mysql2hive_job`這個命令定時(比如說10分鐘頻率)執行一次,就能將資料10分鐘同步一次到`hive`資料倉庫。 ### 2. `Lastmodified` 匯入實戰 `append`適合業務系統庫,一般業務系統表會通過自增ID作為主鍵標識唯一性。`Lastmodified`適合`ETL`的資料根據時間戳欄位匯入,表示只匯入比這個時間戳大,即比這個時間晚的資料。 #### 1). 新建一張表 在`mysql`中新建一張表`people_access_log2`,並且初始化幾條資料: ```sql CREATE TABLE `people_access_log2` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', `user_id` bigint(20) unsigned NOT NULL COMMENT '使用者id', `access_time` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `ip` varchar(15) NOT NULL COMMENT '訪客ip', `url` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ``` 插入資料: ```sql insert into people_access_log2(id,user_id, ip, url) values(1,15110101010,'112.168.1.200','https://www.baidu.com'); insert into people_access_log2(id,user_id, ip, url) values(2,15110101011,'112.16.1.2','https://www.baidu.com'); insert into people_access_log2(id,user_id, ip, url) values(3,15110101012,'112.168.1.2','https://www.taobao.com'); insert into people_access_log2(id,user_id, ip, url) values(4,15110101013,'112.168.10.2','https://www.baidu.com'); insert into people_access_log2(id,user_id, ip, url) values(5,15110101014,'112.168.1.2','https://www.jd.com'); insert into people_access_log2(id,user_id, ip, url) values(6,15110101015,'112.168.12.4','https://www.qq.com'); ``` `mysql`裡面的資料就是這樣: |id|user_id|access_time|ip|url| |:---:|:---:|:---:|:---:|:---:| |`1`|`15110101010`|`2019-12-28 16:23:10`|`112.168.1.200`|`https://www.baidu.com`| |`2`|`15110101011`|`2019-12-28 16:23:33`|`112.16.1.2`|`https://www.baidu.com`| |`3`|`15110101012`|`2019-12-28 16:23:41`|`112.168.1.2`|`https://www.taobao.com`| |`4`|`15110101013`|`2019-12-28 16:23:46`|`112.168.10.2`|`https://www.baidu.com`| |`5`|`15110101014`|`2019-12-28 16:23:52`|`112.168.1.2`|`https://www.jd.com`| |`6`|`15110101015`|`2019-12-28 16:23:56`|`112.168.12.4`|`https://www.qq.`| #### 2). 初始化`hive`表: 初始化`hive`資料,將`mysql`裡面的`6`條資料匯入`hive`中,並且可以自動幫助我們建立對應`hive`表,何樂而不為,否則我們需要自己手動建立,完成初始化工作。 ```bash sqoop import \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --table people_access_log2 \ --hive-import \ --create-hive-table \ --fields-terminated-by ',' \ --hive-table web.people_access_log2 ``` 可以看到執行該命令後,啟動了二一個`mapreduce`任務,這樣6條資料就進入`hive`表`web.people_access_log2`了: ```sql hive> select * from web.people_access_log2; OK 1 15110101010 2019-12-28 16:23:10.0 112.168.1.200 https://www.baidu.com 2 15110101011 2019-12-28 16:23:33.0 112.16.1.2 https://www.baidu.com 3 15110101012 2019-12-28 16:23:41.0 112.168.1.2 https://www.taobao.com 4 15110101013 2019-12-28 16:23:46.0 112.168.10.2 https://www.baidu.com 5 15110101014 2019-12-28 16:23:52.0 112.168.1.2 https://www.jd.com 6 15110101015 2019-12-28 16:23:56.0 112.168.12.4 https://www.qq.com Time taken: 0.326 seconds, Fetched: 6 row(s) ``` #### 3). 增量匯入資料: 我們再次插入一條資料進入`mysql`的`people_access_log2`表: ```sql insert into people_access_log2(id,user_id, ip, url) values(7,15110101016,'112.168.12.45','https://www.qq.com'); ``` 此時,`mysql`表裡面已經有`7`條資料了,我們使用`incremental`的方式進行增量的匯入到`hive`: ```bash sqoop import \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --table people_access_log2 \ --hive-import \ --hive-table people_access_log2 \ -m 1 \ --check-column access_time \ --incremental lastmodified \ --last-value "2019-12-28 16:23:56" \ ``` `2019-12-28 16:23:56`就是第6條資料的時間,這裡需要指定。報錯了: ``` 19/12/28 16:17:25 ERROR tool.ImportTool: Error during import: --merge-key or --append is required when using --incremental lastmodified and the output directory exists. ``` **注意**:可以看到`--merge-key or --append is required when using --incremental lastmodified`意思是,這種基於時間匯入模式,需要指定`--merge-key`或者`--append`引數,表示根據時間戳匯入,資料是直接在末尾追加(append)還是合併(merge),這裡使用`merge`方式,根據`id`合併: ```bash sqoop import \ --connect jdbc:mysql://master1.hadoop:3306/test \ --username root \ --password 123456 \ --table people_access_log2 \ --hive-import \ --hive-table web.people_access_log2 \ --check-column access_time \ --incremental lastmodified \ --last-value "2019-12-28 16:23:56" \ --fields-terminated-by ',' \ --merge-key id ``` 執行該命令後,與直接匯入不同,該命令啟動了2個`mapreduce`任務,這樣就把資料增量`merge`匯入`hive`表了. ```sql hive> select * from web.people_access_log2 order by id; OK 1 15110101010 2019-12-28 16:23:10.0 112.168.1.200 https://www.baidu.com 2 15110101011 2019-12-28 16:23:33.0 112.16.1.2 https://www.baidu.com 3 15110101012 2019-12-28 16:23:41.0 112.168.1.2 https://www.taobao.com 4 15110101013 2019-12-28 16:23:46.0 112.168.10.2 https://www.baidu.com 5 15110101014 2019-12-28 16:23:52.0 112.168.1.2 https://www.jd.com 6 15110101015 2019-12-28 16:23:56.0 112.168.12.4 https://www.qq.com 6 15110101015 2019-12-28 16:23:56.0 112.168.12.4 https://www.qq.com 7 15110101016 2019-12-28 16:28:24.0 112.168.12.45 https://www.qq.com Time taken: 0.241 seconds, Fetched: 8 row(s) ``` 可以看到`id=6`的資料,有2條,它的時間剛好是`--last-value`指定的時間,則會匯入**大於等於**`--last-value`指定時間的資料,這點需要